blob: 1a525dc91359b4481e65d2f23e44b3f618c395e1 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000294 with self.open(support.TESTFN, "wb", buffering=0) as f:
295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 with self.open(support.TESTFN, "rb", buffering=0) as f:
300 self.assertEqual(f.readable(), True)
301 self.assertEqual(f.writable(), False)
302 self.assertEqual(f.seekable(), True)
303 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000304
Guido van Rossum87429772007-04-10 21:06:59 +0000305 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000306 with self.open(support.TESTFN, "wb") as f:
307 self.assertEqual(f.readable(), False)
308 self.assertEqual(f.writable(), True)
309 self.assertEqual(f.seekable(), True)
310 self.write_ops(f)
311 with self.open(support.TESTFN, "rb") as f:
312 self.assertEqual(f.readable(), True)
313 self.assertEqual(f.writable(), False)
314 self.assertEqual(f.seekable(), True)
315 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000316
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000317 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000318 with self.open(support.TESTFN, "wb") as f:
319 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
320 with self.open(support.TESTFN, "rb") as f:
321 self.assertEqual(f.readline(), b"abc\n")
322 self.assertEqual(f.readline(10), b"def\n")
323 self.assertEqual(f.readline(2), b"xy")
324 self.assertEqual(f.readline(4), b"zzy\n")
325 self.assertEqual(f.readline(), b"foo\x00bar\n")
326 self.assertEqual(f.readline(), b"another line")
327 self.assertRaises(TypeError, f.readline, 5.3)
328 with self.open(support.TESTFN, "r") as f:
329 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000332 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.write_ops(f)
334 data = f.getvalue()
335 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000336 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000338
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 # On Windows and Mac OSX this test comsumes large resources; It takes
341 # a long time to build the >2GB file and takes >2GB of disk space
342 # therefore the resource must be enabled to run this test.
343 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000344 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000345 print("\nTesting large file ops skipped on %s." % sys.platform,
346 file=sys.stderr)
347 print("It requires %d bytes and a long time." % self.LARGE,
348 file=sys.stderr)
349 print("Use 'regrtest.py -u largefile test_io' to run it.",
350 file=sys.stderr)
351 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000352 with self.open(support.TESTFN, "w+b", 0) as f:
353 self.large_file_ops(f)
354 with self.open(support.TESTFN, "w+b") as f:
355 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000356
357 def test_with_open(self):
358 for bufsize in (0, 1, 100):
359 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000360 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000361 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000362 self.assertEqual(f.closed, True)
363 f = None
364 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000365 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000366 1/0
367 except ZeroDivisionError:
368 self.assertEqual(f.closed, True)
369 else:
370 self.fail("1/0 didn't raise an exception")
371
Antoine Pitrou08838b62009-01-21 00:55:13 +0000372 # issue 5008
373 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000374 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000377 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000378 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000379 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 self.assert_(f.tell() > 0)
382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_destructor(self):
384 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000385 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def __del__(self):
387 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 try:
389 f = super().__del__
390 except AttributeError:
391 pass
392 else:
393 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000394 def close(self):
395 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def flush(self):
398 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().flush()
400 f = MyFileIO(support.TESTFN, "wb")
401 f.write(b"xxx")
402 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000403 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 with open(support.TESTFN, "rb") as f:
406 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000407
408 def _check_base_destructor(self, base):
409 record = []
410 class MyIO(base):
411 def __init__(self):
412 # This exercises the availability of attributes on object
413 # destruction.
414 # (in the C version, close() is called by the tp_dealloc
415 # function, not by __del__)
416 self.on_del = 1
417 self.on_close = 2
418 self.on_flush = 3
419 def __del__(self):
420 record.append(self.on_del)
421 try:
422 f = super().__del__
423 except AttributeError:
424 pass
425 else:
426 f()
427 def close(self):
428 record.append(self.on_close)
429 super().close()
430 def flush(self):
431 record.append(self.on_flush)
432 super().flush()
433 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000434 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000435 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000436 self.assertEqual(record, [1, 2, 3])
437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 def test_IOBase_destructor(self):
439 self._check_base_destructor(self.IOBase)
440
441 def test_RawIOBase_destructor(self):
442 self._check_base_destructor(self.RawIOBase)
443
444 def test_BufferedIOBase_destructor(self):
445 self._check_base_destructor(self.BufferedIOBase)
446
447 def test_TextIOBase_destructor(self):
448 self._check_base_destructor(self.TextIOBase)
449
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000451 with self.open(support.TESTFN, "wb") as f:
452 f.write(b"xxx")
453 with self.open(support.TESTFN, "rb") as f:
454 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000455
Guido van Rossumd4103952007-04-12 05:44:49 +0000456 def test_array_writes(self):
457 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000458 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000459 with self.open(support.TESTFN, "wb", 0) as f:
460 self.assertEqual(f.write(a), n)
461 with self.open(support.TESTFN, "wb") as f:
462 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000463
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000464 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000466 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000467
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 def test_read_closed(self):
469 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000470 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 with self.open(support.TESTFN, "r") as f:
472 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000473 self.assertEqual(file.read(), "egg\n")
474 file.seek(0)
475 file.close()
476 self.assertRaises(ValueError, file.read)
477
478 def test_no_closefd_with_filename(self):
479 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000480 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000481
482 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000484 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 self.assertEqual(file.buffer.raw.closefd, False)
489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 def test_garbage_collection(self):
491 # FileIO objects are collected, and collecting them flushes
492 # all data to disk.
493 f = self.FileIO(support.TESTFN, "wb")
494 f.write(b"abcxxx")
495 f.f = f
496 wr = weakref.ref(f)
497 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000498 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000499 self.assert_(wr() is None, wr)
500 with open(support.TESTFN, "rb") as f:
501 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000502
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000503 def test_unbounded_file(self):
504 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
505 zero = "/dev/zero"
506 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000507 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000508 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000509 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000510 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000511 self.skipTest("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000512 with open(zero, "rb", buffering=0) as f:
513 self.assertRaises(OverflowError, f.read)
514 with open(zero, "rb") as f:
515 self.assertRaises(OverflowError, f.read)
516 with open(zero, "r") as f:
517 self.assertRaises(OverflowError, f.read)
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519class CIOTest(IOTest):
520 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522class PyIOTest(IOTest):
523 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000524
Guido van Rossuma9e20242007-03-08 00:43:48 +0000525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000526class CommonBufferedTests:
527 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
528
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000529 def test_detach(self):
530 raw = self.MockRawIO()
531 buf = self.tp(raw)
532 self.assertIs(buf.detach(), raw)
533 self.assertRaises(ValueError, buf.detach)
534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535 def test_fileno(self):
536 rawio = self.MockRawIO()
537 bufio = self.tp(rawio)
538
539 self.assertEquals(42, bufio.fileno())
540
541 def test_no_fileno(self):
542 # XXX will we always have fileno() function? If so, kill
543 # this test. Else, write it.
544 pass
545
546 def test_invalid_args(self):
547 rawio = self.MockRawIO()
548 bufio = self.tp(rawio)
549 # Invalid whence
550 self.assertRaises(ValueError, bufio.seek, 0, -1)
551 self.assertRaises(ValueError, bufio.seek, 0, 3)
552
553 def test_override_destructor(self):
554 tp = self.tp
555 record = []
556 class MyBufferedIO(tp):
557 def __del__(self):
558 record.append(1)
559 try:
560 f = super().__del__
561 except AttributeError:
562 pass
563 else:
564 f()
565 def close(self):
566 record.append(2)
567 super().close()
568 def flush(self):
569 record.append(3)
570 super().flush()
571 rawio = self.MockRawIO()
572 bufio = MyBufferedIO(rawio)
573 writable = bufio.writable()
574 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000575 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000576 if writable:
577 self.assertEqual(record, [1, 2, 3])
578 else:
579 self.assertEqual(record, [1, 2])
580
581 def test_context_manager(self):
582 # Test usability as a context manager
583 rawio = self.MockRawIO()
584 bufio = self.tp(rawio)
585 def _with():
586 with bufio:
587 pass
588 _with()
589 # bufio should now be closed, and using it a second time should raise
590 # a ValueError.
591 self.assertRaises(ValueError, _with)
592
593 def test_error_through_destructor(self):
594 # Test that the exception state is not modified by a destructor,
595 # even if close() fails.
596 rawio = self.CloseFailureIO()
597 def f():
598 self.tp(rawio).xyzzy
599 with support.captured_output("stderr") as s:
600 self.assertRaises(AttributeError, f)
601 s = s.getvalue().strip()
602 if s:
603 # The destructor *may* have printed an unraisable error, check it
604 self.assertEqual(len(s.splitlines()), 1)
605 self.assert_(s.startswith("Exception IOError: "), s)
606 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000607
608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
610 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 def test_constructor(self):
613 rawio = self.MockRawIO([b"abc"])
614 bufio = self.tp(rawio)
615 bufio.__init__(rawio)
616 bufio.__init__(rawio, buffer_size=1024)
617 bufio.__init__(rawio, buffer_size=16)
618 self.assertEquals(b"abc", bufio.read())
619 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
620 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
621 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
622 rawio = self.MockRawIO([b"abc"])
623 bufio.__init__(rawio)
624 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 def test_read(self):
627 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
628 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000629 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000630 # Invalid args
631 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 def test_read1(self):
634 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
635 bufio = self.tp(rawio)
636 self.assertEquals(b"a", bufio.read(1))
637 self.assertEquals(b"b", bufio.read1(1))
638 self.assertEquals(rawio._reads, 1)
639 self.assertEquals(b"c", bufio.read1(100))
640 self.assertEquals(rawio._reads, 1)
641 self.assertEquals(b"d", bufio.read1(100))
642 self.assertEquals(rawio._reads, 2)
643 self.assertEquals(b"efg", bufio.read1(100))
644 self.assertEquals(rawio._reads, 3)
645 self.assertEquals(b"", bufio.read1(100))
646 # Invalid args
647 self.assertRaises(ValueError, bufio.read1, -1)
648
649 def test_readinto(self):
650 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
651 bufio = self.tp(rawio)
652 b = bytearray(2)
653 self.assertEquals(bufio.readinto(b), 2)
654 self.assertEquals(b, b"ab")
655 self.assertEquals(bufio.readinto(b), 2)
656 self.assertEquals(b, b"cd")
657 self.assertEquals(bufio.readinto(b), 2)
658 self.assertEquals(b, b"ef")
659 self.assertEquals(bufio.readinto(b), 1)
660 self.assertEquals(b, b"gf")
661 self.assertEquals(bufio.readinto(b), 0)
662 self.assertEquals(b, b"gf")
663
664 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000665 data = b"abcdefghi"
666 dlen = len(data)
667
668 tests = [
669 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
670 [ 100, [ 3, 3, 3], [ dlen ] ],
671 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
672 ]
673
674 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 rawio = self.MockFileIO(data)
676 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000677 pos = 0
678 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000679 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000680 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000682 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000685 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
687 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000688
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000689 self.assertEquals(b"abcd", bufio.read(6))
690 self.assertEquals(b"e", bufio.read(1))
691 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000692 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000693 self.assert_(None is bufio.read())
694 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_read_past_eof(self):
697 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
698 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000699
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000700 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_read_all(self):
703 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
704 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000705
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000706 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000709 try:
710 # Write out many bytes with exactly the same number of 0's,
711 # 1's... 255's. This will help us check that concurrent reading
712 # doesn't duplicate or forget contents.
713 N = 1000
714 l = list(range(256)) * N
715 random.shuffle(l)
716 s = bytes(bytearray(l))
717 with io.open(support.TESTFN, "wb") as f:
718 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
720 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000721 errors = []
722 results = []
723 def f():
724 try:
725 # Intra-buffer read then buffer-flushing read
726 for n in cycle([1, 19]):
727 s = bufio.read(n)
728 if not s:
729 break
730 # list.append() is atomic
731 results.append(s)
732 except Exception as e:
733 errors.append(e)
734 raise
735 threads = [threading.Thread(target=f) for x in range(20)]
736 for t in threads:
737 t.start()
738 time.sleep(0.02) # yield
739 for t in threads:
740 t.join()
741 self.assertFalse(errors,
742 "the following exceptions were caught: %r" % errors)
743 s = b''.join(results)
744 for i in range(256):
745 c = bytes(bytearray([i]))
746 self.assertEqual(s.count(c), N)
747 finally:
748 support.unlink(support.TESTFN)
749
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000750 def test_misbehaved_io(self):
751 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
752 bufio = self.tp(rawio)
753 self.assertRaises(IOError, bufio.seek, 0)
754 self.assertRaises(IOError, bufio.tell)
755
756class CBufferedReaderTest(BufferedReaderTest):
757 tp = io.BufferedReader
758
759 def test_constructor(self):
760 BufferedReaderTest.test_constructor(self)
761 # The allocation can succeed on 32-bit builds, e.g. with more
762 # than 2GB RAM and a 64-bit kernel.
763 if sys.maxsize > 0x7FFFFFFF:
764 rawio = self.MockRawIO()
765 bufio = self.tp(rawio)
766 self.assertRaises((OverflowError, MemoryError, ValueError),
767 bufio.__init__, rawio, sys.maxsize)
768
769 def test_initialization(self):
770 rawio = self.MockRawIO([b"abc"])
771 bufio = self.tp(rawio)
772 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
773 self.assertRaises(ValueError, bufio.read)
774 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
775 self.assertRaises(ValueError, bufio.read)
776 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
777 self.assertRaises(ValueError, bufio.read)
778
779 def test_misbehaved_io_read(self):
780 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
781 bufio = self.tp(rawio)
782 # _pyio.BufferedReader seems to implement reading different, so that
783 # checking this is not so easy.
784 self.assertRaises(IOError, bufio.read, 10)
785
786 def test_garbage_collection(self):
787 # C BufferedReader objects are collected.
788 # The Python version has __del__, so it ends into gc.garbage instead
789 rawio = self.FileIO(support.TESTFN, "w+b")
790 f = self.tp(rawio)
791 f.f = f
792 wr = weakref.ref(f)
793 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000794 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795 self.assert_(wr() is None, wr)
796
797class PyBufferedReaderTest(BufferedReaderTest):
798 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000799
Guido van Rossuma9e20242007-03-08 00:43:48 +0000800
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000801class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
802 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000804 def test_constructor(self):
805 rawio = self.MockRawIO()
806 bufio = self.tp(rawio)
807 bufio.__init__(rawio)
808 bufio.__init__(rawio, buffer_size=1024)
809 bufio.__init__(rawio, buffer_size=16)
810 self.assertEquals(3, bufio.write(b"abc"))
811 bufio.flush()
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
813 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
815 bufio.__init__(rawio)
816 self.assertEquals(3, bufio.write(b"ghi"))
817 bufio.flush()
818 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
819
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000820 def test_detach_flush(self):
821 raw = self.MockRawIO()
822 buf = self.tp(raw)
823 buf.write(b"howdy!")
824 self.assertFalse(raw._write_stack)
825 buf.detach()
826 self.assertEqual(raw._write_stack, [b"howdy!"])
827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000829 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000830 writer = self.MockRawIO()
831 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000832 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000833 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835 def test_write_overflow(self):
836 writer = self.MockRawIO()
837 bufio = self.tp(writer, 8)
838 contents = b"abcdefghijklmnop"
839 for n in range(0, len(contents), 3):
840 bufio.write(contents[n:n+3])
841 flushed = b"".join(writer._write_stack)
842 # At least (total - 8) bytes were implicitly flushed, perhaps more
843 # depending on the implementation.
844 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000846 def check_writes(self, intermediate_func):
847 # Lots of writes, test the flushed output is as expected.
848 contents = bytes(range(256)) * 1000
849 n = 0
850 writer = self.MockRawIO()
851 bufio = self.tp(writer, 13)
852 # Generator of write sizes: repeat each N 15 times then proceed to N+1
853 def gen_sizes():
854 for size in count(1):
855 for i in range(15):
856 yield size
857 sizes = gen_sizes()
858 while n < len(contents):
859 size = min(next(sizes), len(contents) - n)
860 self.assertEquals(bufio.write(contents[n:n+size]), size)
861 intermediate_func(bufio)
862 n += size
863 bufio.flush()
864 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 def test_writes(self):
867 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_writes_and_flushes(self):
870 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_writes_and_seeks(self):
873 def _seekabs(bufio):
874 pos = bufio.tell()
875 bufio.seek(pos + 1, 0)
876 bufio.seek(pos - 1, 0)
877 bufio.seek(pos, 0)
878 self.check_writes(_seekabs)
879 def _seekrel(bufio):
880 pos = bufio.seek(0, 1)
881 bufio.seek(+1, 1)
882 bufio.seek(-1, 1)
883 bufio.seek(pos, 0)
884 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000885
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 def test_writes_and_truncates(self):
887 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_write_non_blocking(self):
890 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000891 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893 self.assertEquals(bufio.write(b"abcd"), 4)
894 self.assertEquals(bufio.write(b"efghi"), 5)
895 # 1 byte will be written, the rest will be buffered
896 raw.block_on(b"k")
897 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 # 8 bytes will be written, 8 will be buffered and the rest will be lost
900 raw.block_on(b"0")
901 try:
902 bufio.write(b"opqrwxyz0123456789")
903 except self.BlockingIOError as e:
904 written = e.characters_written
905 else:
906 self.fail("BlockingIOError should have been raised")
907 self.assertEquals(written, 16)
908 self.assertEquals(raw.pop_written(),
909 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
912 s = raw.pop_written()
913 # Previously buffered bytes were flushed
914 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_write_and_rewind(self):
917 raw = io.BytesIO()
918 bufio = self.tp(raw, 4)
919 self.assertEqual(bufio.write(b"abcdef"), 6)
920 self.assertEqual(bufio.tell(), 6)
921 bufio.seek(0, 0)
922 self.assertEqual(bufio.write(b"XY"), 2)
923 bufio.seek(6, 0)
924 self.assertEqual(raw.getvalue(), b"XYcdef")
925 self.assertEqual(bufio.write(b"123456"), 6)
926 bufio.flush()
927 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 def test_flush(self):
930 writer = self.MockRawIO()
931 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000932 bufio.write(b"abc")
933 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000934 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 def test_destructor(self):
937 writer = self.MockRawIO()
938 bufio = self.tp(writer, 8)
939 bufio.write(b"abc")
940 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000941 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 self.assertEquals(b"abc", writer._write_stack[0])
943
944 def test_truncate(self):
945 # Truncate implicitly flushes the buffer.
946 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
947 bufio = self.tp(raw, 8)
948 bufio.write(b"abcdef")
949 self.assertEqual(bufio.truncate(3), 3)
950 self.assertEqual(bufio.tell(), 3)
951 with io.open(support.TESTFN, "rb", buffering=0) as f:
952 self.assertEqual(f.read(), b"abc")
953
954 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000955 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956 # Write out many bytes from many threads and test they were
957 # all flushed.
958 N = 1000
959 contents = bytes(range(256)) * N
960 sizes = cycle([1, 19])
961 n = 0
962 queue = deque()
963 while n < len(contents):
964 size = next(sizes)
965 queue.append(contents[n:n+size])
966 n += size
967 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000968 # We use a real file object because it allows us to
969 # exercise situations where the GIL is released before
970 # writing the buffer to the raw streams. This is in addition
971 # to concurrency issues due to switching threads in the middle
972 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
974 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 errors = []
976 def f():
977 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000978 while True:
979 try:
980 s = queue.popleft()
981 except IndexError:
982 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000983 bufio.write(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
988 for t in threads:
989 t.start()
990 time.sleep(0.02) # yield
991 for t in threads:
992 t.join()
993 self.assertFalse(errors,
994 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 bufio.close()
996 with io.open(support.TESTFN, "rb") as f:
997 s = f.read()
998 for i in range(256):
999 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001000 finally:
1001 support.unlink(support.TESTFN)
1002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003 def test_misbehaved_io(self):
1004 rawio = self.MisbehavedRawIO()
1005 bufio = self.tp(rawio, 5)
1006 self.assertRaises(IOError, bufio.seek, 0)
1007 self.assertRaises(IOError, bufio.tell)
1008 self.assertRaises(IOError, bufio.write, b"abcdef")
1009
Benjamin Peterson59406a92009-03-26 17:10:29 +00001010 def test_max_buffer_size_deprecation(self):
1011 with support.check_warnings() as w:
1012 warnings.simplefilter("always", DeprecationWarning)
1013 self.tp(self.MockRawIO(), 8, 12)
1014 self.assertEqual(len(w.warnings), 1)
1015 warning = w.warnings[0]
1016 self.assertTrue(warning.category is DeprecationWarning)
1017 self.assertEqual(str(warning.message),
1018 "max_buffer_size is deprecated")
1019
1020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021class CBufferedWriterTest(BufferedWriterTest):
1022 tp = io.BufferedWriter
1023
1024 def test_constructor(self):
1025 BufferedWriterTest.test_constructor(self)
1026 # The allocation can succeed on 32-bit builds, e.g. with more
1027 # than 2GB RAM and a 64-bit kernel.
1028 if sys.maxsize > 0x7FFFFFFF:
1029 rawio = self.MockRawIO()
1030 bufio = self.tp(rawio)
1031 self.assertRaises((OverflowError, MemoryError, ValueError),
1032 bufio.__init__, rawio, sys.maxsize)
1033
1034 def test_initialization(self):
1035 rawio = self.MockRawIO()
1036 bufio = self.tp(rawio)
1037 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1038 self.assertRaises(ValueError, bufio.write, b"def")
1039 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1040 self.assertRaises(ValueError, bufio.write, b"def")
1041 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1042 self.assertRaises(ValueError, bufio.write, b"def")
1043
1044 def test_garbage_collection(self):
1045 # C BufferedWriter objects are collected, and collecting them flushes
1046 # all data to disk.
1047 # The Python version has __del__, so it ends into gc.garbage instead
1048 rawio = self.FileIO(support.TESTFN, "w+b")
1049 f = self.tp(rawio)
1050 f.write(b"123xxx")
1051 f.x = f
1052 wr = weakref.ref(f)
1053 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001054 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 self.assert_(wr() is None, wr)
1056 with open(support.TESTFN, "rb") as f:
1057 self.assertEqual(f.read(), b"123xxx")
1058
1059
1060class PyBufferedWriterTest(BufferedWriterTest):
1061 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001062
Guido van Rossum01a27522007-03-07 01:00:12 +00001063class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001064
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001065 def test_constructor(self):
1066 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001067 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001068
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001069 def test_detach(self):
1070 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1071 self.assertRaises(self.UnsupportedOperation, pair.detach)
1072
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001073 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001074 with support.check_warnings() as w:
1075 warnings.simplefilter("always", DeprecationWarning)
1076 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1077 self.assertEqual(len(w.warnings), 1)
1078 warning = w.warnings[0]
1079 self.assertTrue(warning.category is DeprecationWarning)
1080 self.assertEqual(str(warning.message),
1081 "max_buffer_size is deprecated")
1082
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001083 def test_constructor_with_not_readable(self):
1084 class NotReadable(MockRawIO):
1085 def readable(self):
1086 return False
1087
1088 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1089
1090 def test_constructor_with_not_writeable(self):
1091 class NotWriteable(MockRawIO):
1092 def writable(self):
1093 return False
1094
1095 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1096
1097 def test_read(self):
1098 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1099
1100 self.assertEqual(pair.read(3), b"abc")
1101 self.assertEqual(pair.read(1), b"d")
1102 self.assertEqual(pair.read(), b"ef")
1103
1104 def test_read1(self):
1105 # .read1() is delegated to the underlying reader object, so this test
1106 # can be shallow.
1107 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1108
1109 self.assertEqual(pair.read1(3), b"abc")
1110
1111 def test_readinto(self):
1112 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1113
1114 data = bytearray(5)
1115 self.assertEqual(pair.readinto(data), 5)
1116 self.assertEqual(data, b"abcde")
1117
1118 def test_write(self):
1119 w = self.MockRawIO()
1120 pair = self.tp(self.MockRawIO(), w)
1121
1122 pair.write(b"abc")
1123 pair.flush()
1124 pair.write(b"def")
1125 pair.flush()
1126 self.assertEqual(w._write_stack, [b"abc", b"def"])
1127
1128 def test_peek(self):
1129 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1130
1131 self.assertTrue(pair.peek(3).startswith(b"abc"))
1132 self.assertEqual(pair.read(3), b"abc")
1133
1134 def test_readable(self):
1135 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1136 self.assertTrue(pair.readable())
1137
1138 def test_writeable(self):
1139 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1140 self.assertTrue(pair.writable())
1141
1142 def test_seekable(self):
1143 # BufferedRWPairs are never seekable, even if their readers and writers
1144 # are.
1145 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1146 self.assertFalse(pair.seekable())
1147
1148 # .flush() is delegated to the underlying writer object and has been
1149 # tested in the test_write method.
1150
1151 def test_close_and_closed(self):
1152 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1153 self.assertFalse(pair.closed)
1154 pair.close()
1155 self.assertTrue(pair.closed)
1156
1157 def test_isatty(self):
1158 class SelectableIsAtty(MockRawIO):
1159 def __init__(self, isatty):
1160 MockRawIO.__init__(self)
1161 self._isatty = isatty
1162
1163 def isatty(self):
1164 return self._isatty
1165
1166 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1167 self.assertFalse(pair.isatty())
1168
1169 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1170 self.assertTrue(pair.isatty())
1171
1172 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1173 self.assertTrue(pair.isatty())
1174
1175 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1176 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178class CBufferedRWPairTest(BufferedRWPairTest):
1179 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181class PyBufferedRWPairTest(BufferedRWPairTest):
1182 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184
1185class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1186 read_mode = "rb+"
1187 write_mode = "wb+"
1188
1189 def test_constructor(self):
1190 BufferedReaderTest.test_constructor(self)
1191 BufferedWriterTest.test_constructor(self)
1192
1193 def test_read_and_write(self):
1194 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001195 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001196
1197 self.assertEqual(b"as", rw.read(2))
1198 rw.write(b"ddd")
1199 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001200 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001202 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 def test_seek_and_tell(self):
1205 raw = self.BytesIO(b"asdfghjkl")
1206 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001207
1208 self.assertEquals(b"as", rw.read(2))
1209 self.assertEquals(2, rw.tell())
1210 rw.seek(0, 0)
1211 self.assertEquals(b"asdf", rw.read(4))
1212
1213 rw.write(b"asdf")
1214 rw.seek(0, 0)
1215 self.assertEquals(b"asdfasdfl", rw.read())
1216 self.assertEquals(9, rw.tell())
1217 rw.seek(-4, 2)
1218 self.assertEquals(5, rw.tell())
1219 rw.seek(2, 1)
1220 self.assertEquals(7, rw.tell())
1221 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001222 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def check_flush_and_read(self, read_func):
1225 raw = self.BytesIO(b"abcdefghi")
1226 bufio = self.tp(raw)
1227
1228 self.assertEquals(b"ab", read_func(bufio, 2))
1229 bufio.write(b"12")
1230 self.assertEquals(b"ef", read_func(bufio, 2))
1231 self.assertEquals(6, bufio.tell())
1232 bufio.flush()
1233 self.assertEquals(6, bufio.tell())
1234 self.assertEquals(b"ghi", read_func(bufio))
1235 raw.seek(0, 0)
1236 raw.write(b"XYZ")
1237 # flush() resets the read buffer
1238 bufio.flush()
1239 bufio.seek(0, 0)
1240 self.assertEquals(b"XYZ", read_func(bufio, 3))
1241
1242 def test_flush_and_read(self):
1243 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1244
1245 def test_flush_and_readinto(self):
1246 def _readinto(bufio, n=-1):
1247 b = bytearray(n if n >= 0 else 9999)
1248 n = bufio.readinto(b)
1249 return bytes(b[:n])
1250 self.check_flush_and_read(_readinto)
1251
1252 def test_flush_and_peek(self):
1253 def _peek(bufio, n=-1):
1254 # This relies on the fact that the buffer can contain the whole
1255 # raw stream, otherwise peek() can return less.
1256 b = bufio.peek(n)
1257 if n != -1:
1258 b = b[:n]
1259 bufio.seek(len(b), 1)
1260 return b
1261 self.check_flush_and_read(_peek)
1262
1263 def test_flush_and_write(self):
1264 raw = self.BytesIO(b"abcdefghi")
1265 bufio = self.tp(raw)
1266
1267 bufio.write(b"123")
1268 bufio.flush()
1269 bufio.write(b"45")
1270 bufio.flush()
1271 bufio.seek(0, 0)
1272 self.assertEquals(b"12345fghi", raw.getvalue())
1273 self.assertEquals(b"12345fghi", bufio.read())
1274
1275 def test_threads(self):
1276 BufferedReaderTest.test_threads(self)
1277 BufferedWriterTest.test_threads(self)
1278
1279 def test_writes_and_peek(self):
1280 def _peek(bufio):
1281 bufio.peek(1)
1282 self.check_writes(_peek)
1283 def _peek(bufio):
1284 pos = bufio.tell()
1285 bufio.seek(-1, 1)
1286 bufio.peek(1)
1287 bufio.seek(pos, 0)
1288 self.check_writes(_peek)
1289
1290 def test_writes_and_reads(self):
1291 def _read(bufio):
1292 bufio.seek(-1, 1)
1293 bufio.read(1)
1294 self.check_writes(_read)
1295
1296 def test_writes_and_read1s(self):
1297 def _read1(bufio):
1298 bufio.seek(-1, 1)
1299 bufio.read1(1)
1300 self.check_writes(_read1)
1301
1302 def test_writes_and_readintos(self):
1303 def _read(bufio):
1304 bufio.seek(-1, 1)
1305 bufio.readinto(bytearray(1))
1306 self.check_writes(_read)
1307
1308 def test_misbehaved_io(self):
1309 BufferedReaderTest.test_misbehaved_io(self)
1310 BufferedWriterTest.test_misbehaved_io(self)
1311
1312class CBufferedRandomTest(BufferedRandomTest):
1313 tp = io.BufferedRandom
1314
1315 def test_constructor(self):
1316 BufferedRandomTest.test_constructor(self)
1317 # The allocation can succeed on 32-bit builds, e.g. with more
1318 # than 2GB RAM and a 64-bit kernel.
1319 if sys.maxsize > 0x7FFFFFFF:
1320 rawio = self.MockRawIO()
1321 bufio = self.tp(rawio)
1322 self.assertRaises((OverflowError, MemoryError, ValueError),
1323 bufio.__init__, rawio, sys.maxsize)
1324
1325 def test_garbage_collection(self):
1326 CBufferedReaderTest.test_garbage_collection(self)
1327 CBufferedWriterTest.test_garbage_collection(self)
1328
1329class PyBufferedRandomTest(BufferedRandomTest):
1330 tp = pyio.BufferedRandom
1331
1332
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001333# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1334# properties:
1335# - A single output character can correspond to many bytes of input.
1336# - The number of input bytes to complete the character can be
1337# undetermined until the last input byte is received.
1338# - The number of input bytes can vary depending on previous input.
1339# - A single input byte can correspond to many characters of output.
1340# - The number of output characters can be undetermined until the
1341# last input byte is received.
1342# - The number of output characters can vary depending on previous input.
1343
1344class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1345 """
1346 For testing seek/tell behavior with a stateful, buffering decoder.
1347
1348 Input is a sequence of words. Words may be fixed-length (length set
1349 by input) or variable-length (period-terminated). In variable-length
1350 mode, extra periods are ignored. Possible words are:
1351 - 'i' followed by a number sets the input length, I (maximum 99).
1352 When I is set to 0, words are space-terminated.
1353 - 'o' followed by a number sets the output length, O (maximum 99).
1354 - Any other word is converted into a word followed by a period on
1355 the output. The output word consists of the input word truncated
1356 or padded out with hyphens to make its length equal to O. If O
1357 is 0, the word is output verbatim without truncating or padding.
1358 I and O are initially set to 1. When I changes, any buffered input is
1359 re-scanned according to the new I. EOF also terminates the last word.
1360 """
1361
1362 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001363 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001364 self.reset()
1365
1366 def __repr__(self):
1367 return '<SID %x>' % id(self)
1368
1369 def reset(self):
1370 self.i = 1
1371 self.o = 1
1372 self.buffer = bytearray()
1373
1374 def getstate(self):
1375 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1376 return bytes(self.buffer), i*100 + o
1377
1378 def setstate(self, state):
1379 buffer, io = state
1380 self.buffer = bytearray(buffer)
1381 i, o = divmod(io, 100)
1382 self.i, self.o = i ^ 1, o ^ 1
1383
1384 def decode(self, input, final=False):
1385 output = ''
1386 for b in input:
1387 if self.i == 0: # variable-length, terminated with period
1388 if b == ord('.'):
1389 if self.buffer:
1390 output += self.process_word()
1391 else:
1392 self.buffer.append(b)
1393 else: # fixed-length, terminate after self.i bytes
1394 self.buffer.append(b)
1395 if len(self.buffer) == self.i:
1396 output += self.process_word()
1397 if final and self.buffer: # EOF terminates the last word
1398 output += self.process_word()
1399 return output
1400
1401 def process_word(self):
1402 output = ''
1403 if self.buffer[0] == ord('i'):
1404 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1405 elif self.buffer[0] == ord('o'):
1406 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1407 else:
1408 output = self.buffer.decode('ascii')
1409 if len(output) < self.o:
1410 output += '-'*self.o # pad out with hyphens
1411 if self.o:
1412 output = output[:self.o] # truncate to output length
1413 output += '.'
1414 self.buffer = bytearray()
1415 return output
1416
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001417 codecEnabled = False
1418
1419 @classmethod
1420 def lookupTestDecoder(cls, name):
1421 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001422 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001423 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001424 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001425 incrementalencoder=None,
1426 streamreader=None, streamwriter=None,
1427 incrementaldecoder=cls)
1428
1429# Register the previous decoder for testing.
1430# Disabled by default, tests will enable it.
1431codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1432
1433
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001434class StatefulIncrementalDecoderTest(unittest.TestCase):
1435 """
1436 Make sure the StatefulIncrementalDecoder actually works.
1437 """
1438
1439 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001440 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001441 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001442 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001443 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001444 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001445 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001446 # I=0, O=6 (variable-length input, fixed-length output)
1447 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1448 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001449 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001450 # I=6, O=3 (fixed-length input > fixed-length output)
1451 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1452 # I=0, then 3; O=29, then 15 (with longer output)
1453 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1454 'a----------------------------.' +
1455 'b----------------------------.' +
1456 'cde--------------------------.' +
1457 'abcdefghijabcde.' +
1458 'a.b------------.' +
1459 '.c.------------.' +
1460 'd.e------------.' +
1461 'k--------------.' +
1462 'l--------------.' +
1463 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001464 ]
1465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001467 # Try a few one-shot test cases.
1468 for input, eof, output in self.test_cases:
1469 d = StatefulIncrementalDecoder()
1470 self.assertEquals(d.decode(input, eof), output)
1471
1472 # Also test an unfinished decode, followed by forcing EOF.
1473 d = StatefulIncrementalDecoder()
1474 self.assertEquals(d.decode(b'oiabcd'), '')
1475 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001476
1477class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001478
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001479 def setUp(self):
1480 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1481 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001482 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001483
Guido van Rossumd0712812007-04-11 16:32:43 +00001484 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001485 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_constructor(self):
1488 r = self.BytesIO(b"\xc3\xa9\n\n")
1489 b = self.BufferedReader(r, 1000)
1490 t = self.TextIOWrapper(b)
1491 t.__init__(b, encoding="latin1", newline="\r\n")
1492 self.assertEquals(t.encoding, "latin1")
1493 self.assertEquals(t.line_buffering, False)
1494 t.__init__(b, encoding="utf8", line_buffering=True)
1495 self.assertEquals(t.encoding, "utf8")
1496 self.assertEquals(t.line_buffering, True)
1497 self.assertEquals("\xe9\n", t.readline())
1498 self.assertRaises(TypeError, t.__init__, b, newline=42)
1499 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1500
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001501 def test_detach(self):
1502 r = self.BytesIO()
1503 b = self.BufferedWriter(r)
1504 t = self.TextIOWrapper(b)
1505 self.assertIs(t.detach(), b)
1506
1507 t = self.TextIOWrapper(b, encoding="ascii")
1508 t.write("howdy")
1509 self.assertFalse(r.getvalue())
1510 t.detach()
1511 self.assertEqual(r.getvalue(), b"howdy")
1512 self.assertRaises(ValueError, t.detach)
1513
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001514 def test_repr(self):
1515 raw = self.BytesIO("hello".encode("utf-8"))
1516 b = self.BufferedReader(raw)
1517 t = self.TextIOWrapper(b, encoding="utf-8")
1518 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def test_line_buffering(self):
1521 r = self.BytesIO()
1522 b = self.BufferedWriter(r, 1000)
1523 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001524 t.write("X")
1525 self.assertEquals(r.getvalue(), b"") # No flush happened
1526 t.write("Y\nZ")
1527 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1528 t.write("A\rB")
1529 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 def test_encoding(self):
1532 # Check the encoding attribute is always set, and valid
1533 b = self.BytesIO()
1534 t = self.TextIOWrapper(b, encoding="utf8")
1535 self.assertEqual(t.encoding, "utf8")
1536 t = self.TextIOWrapper(b)
1537 self.assert_(t.encoding is not None)
1538 codecs.lookup(t.encoding)
1539
1540 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001541 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001542 b = self.BytesIO(b"abc\n\xff\n")
1543 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001544 self.assertRaises(UnicodeError, t.read)
1545 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 b = self.BytesIO(b"abc\n\xff\n")
1547 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001548 self.assertRaises(UnicodeError, t.read)
1549 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 b = self.BytesIO(b"abc\n\xff\n")
1551 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001552 self.assertEquals(t.read(), "abc\n\n")
1553 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 b = self.BytesIO(b"abc\n\xff\n")
1555 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001556 self.assertEquals(t.read(), "abc\n\ufffd\n")
1557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001559 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560 b = self.BytesIO()
1561 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001562 self.assertRaises(UnicodeError, t.write, "\xff")
1563 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001564 b = self.BytesIO()
1565 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001566 self.assertRaises(UnicodeError, t.write, "\xff")
1567 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001568 b = self.BytesIO()
1569 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001570 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001571 t.write("abc\xffdef\n")
1572 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001573 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001574 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 b = self.BytesIO()
1576 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001577 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001578 t.write("abc\xffdef\n")
1579 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001580 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001583 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1584
1585 tests = [
1586 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001587 [ '', input_lines ],
1588 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1589 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1590 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001591 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001592 encodings = (
1593 'utf-8', 'latin-1',
1594 'utf-16', 'utf-16-le', 'utf-16-be',
1595 'utf-32', 'utf-32-le', 'utf-32-be',
1596 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001597
Guido van Rossum8358db22007-08-18 21:39:55 +00001598 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001599 # character in TextIOWrapper._pending_line.
1600 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001601 # XXX: str.encode() should return bytes
1602 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001603 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001604 for bufsize in range(1, 10):
1605 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1607 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001608 encoding=encoding)
1609 if do_reads:
1610 got_lines = []
1611 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001612 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001613 if c2 == '':
1614 break
1615 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001616 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001617 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001618 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001619
1620 for got_line, exp_line in zip(got_lines, exp_lines):
1621 self.assertEquals(got_line, exp_line)
1622 self.assertEquals(len(got_lines), len(exp_lines))
1623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 def test_newlines_input(self):
1625 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001626 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1627 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001628 (None, normalized.decode("ascii").splitlines(True)),
1629 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1631 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1632 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001633 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 buf = self.BytesIO(testdata)
1635 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001636 self.assertEquals(txt.readlines(), expected)
1637 txt.seek(0)
1638 self.assertEquals(txt.read(), "".join(expected))
1639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def test_newlines_output(self):
1641 testdict = {
1642 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1643 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1644 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1645 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1646 }
1647 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1648 for newline, expected in tests:
1649 buf = self.BytesIO()
1650 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1651 txt.write("AAA\nB")
1652 txt.write("BB\nCCC\n")
1653 txt.write("X\rY\r\nZ")
1654 txt.flush()
1655 self.assertEquals(buf.closed, False)
1656 self.assertEquals(buf.getvalue(), expected)
1657
1658 def test_destructor(self):
1659 l = []
1660 base = self.BytesIO
1661 class MyBytesIO(base):
1662 def close(self):
1663 l.append(self.getvalue())
1664 base.close(self)
1665 b = MyBytesIO()
1666 t = self.TextIOWrapper(b, encoding="ascii")
1667 t.write("abc")
1668 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 self.assertEquals([b"abc"], l)
1671
1672 def test_override_destructor(self):
1673 record = []
1674 class MyTextIO(self.TextIOWrapper):
1675 def __del__(self):
1676 record.append(1)
1677 try:
1678 f = super().__del__
1679 except AttributeError:
1680 pass
1681 else:
1682 f()
1683 def close(self):
1684 record.append(2)
1685 super().close()
1686 def flush(self):
1687 record.append(3)
1688 super().flush()
1689 b = self.BytesIO()
1690 t = MyTextIO(b, encoding="ascii")
1691 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001692 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 self.assertEqual(record, [1, 2, 3])
1694
1695 def test_error_through_destructor(self):
1696 # Test that the exception state is not modified by a destructor,
1697 # even if close() fails.
1698 rawio = self.CloseFailureIO()
1699 def f():
1700 self.TextIOWrapper(rawio).xyzzy
1701 with support.captured_output("stderr") as s:
1702 self.assertRaises(AttributeError, f)
1703 s = s.getvalue().strip()
1704 if s:
1705 # The destructor *may* have printed an unraisable error, check it
1706 self.assertEqual(len(s.splitlines()), 1)
1707 self.assert_(s.startswith("Exception IOError: "), s)
1708 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001709
Guido van Rossum9b76da62007-04-11 01:09:03 +00001710 # Systematic tests of the text I/O API
1711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001713 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1714 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001716 f._CHUNK_SIZE = chunksize
1717 self.assertEquals(f.write("abc"), 3)
1718 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001720 f._CHUNK_SIZE = chunksize
1721 self.assertEquals(f.tell(), 0)
1722 self.assertEquals(f.read(), "abc")
1723 cookie = f.tell()
1724 self.assertEquals(f.seek(0), 0)
1725 self.assertEquals(f.read(2), "ab")
1726 self.assertEquals(f.read(1), "c")
1727 self.assertEquals(f.read(1), "")
1728 self.assertEquals(f.read(), "")
1729 self.assertEquals(f.tell(), cookie)
1730 self.assertEquals(f.seek(0), 0)
1731 self.assertEquals(f.seek(0, 2), cookie)
1732 self.assertEquals(f.write("def"), 3)
1733 self.assertEquals(f.seek(cookie), cookie)
1734 self.assertEquals(f.read(), "def")
1735 if enc.startswith("utf"):
1736 self.multi_line_test(f, enc)
1737 f.close()
1738
1739 def multi_line_test(self, f, enc):
1740 f.seek(0)
1741 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001742 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001743 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001744 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001745 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001746 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001747 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001748 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001749 wlines.append((f.tell(), line))
1750 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001751 f.seek(0)
1752 rlines = []
1753 while True:
1754 pos = f.tell()
1755 line = f.readline()
1756 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001757 break
1758 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001759 self.assertEquals(rlines, wlines)
1760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 def test_telling(self):
1762 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001763 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001764 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001765 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001766 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001767 p2 = f.tell()
1768 f.seek(0)
1769 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001770 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001771 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001772 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001773 self.assertEquals(f.tell(), p2)
1774 f.seek(0)
1775 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001776 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001777 self.assertRaises(IOError, f.tell)
1778 self.assertEquals(f.tell(), p2)
1779 f.close()
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781 def test_seeking(self):
1782 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001783 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001784 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001785 prefix = bytes(u_prefix.encode("utf-8"))
1786 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001787 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001788 suffix = bytes(u_suffix.encode("utf-8"))
1789 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001791 f.write(line*2)
1792 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001794 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001795 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001796 self.assertEquals(f.tell(), prefix_size)
1797 self.assertEquals(f.readline(), u_suffix)
1798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001800 # Regression test for a specific bug
1801 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001803 f.write(data)
1804 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001806 f._CHUNK_SIZE # Just test that it exists
1807 f._CHUNK_SIZE = 2
1808 f.readline()
1809 f.tell()
1810
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 def test_seek_and_tell(self):
1812 #Test seek/tell using the StatefulIncrementalDecoder.
1813 # Make test faster by doing smaller seeks
1814 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001815
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001816 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001817 """Tell/seek to various points within a data stream and ensure
1818 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001819 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001820 f.write(data)
1821 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 f = self.open(support.TESTFN, encoding='test_decoder')
1823 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001824 decoded = f.read()
1825 f.close()
1826
Neal Norwitze2b07052008-03-18 19:52:05 +00001827 for i in range(min_pos, len(decoded) + 1): # seek positions
1828 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001830 self.assertEquals(f.read(i), decoded[:i])
1831 cookie = f.tell()
1832 self.assertEquals(f.read(j), decoded[i:i + j])
1833 f.seek(cookie)
1834 self.assertEquals(f.read(), decoded[i:])
1835 f.close()
1836
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001837 # Enable the test decoder.
1838 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001839
1840 # Run the tests.
1841 try:
1842 # Try each test case.
1843 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001844 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001845
1846 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001847 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1848 offset = CHUNK_SIZE - len(input)//2
1849 prefix = b'.'*offset
1850 # Don't bother seeking into the prefix (takes too long).
1851 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001852 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001853
1854 # Ensure our test decoder won't interfere with subsequent tests.
1855 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001856 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001859 data = "1234567890"
1860 tests = ("utf-16",
1861 "utf-16-le",
1862 "utf-16-be",
1863 "utf-32",
1864 "utf-32-le",
1865 "utf-32-be")
1866 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 buf = self.BytesIO()
1868 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001869 # Check if the BOM is written only once (see issue1753).
1870 f.write(data)
1871 f.write(data)
1872 f.seek(0)
1873 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001874 f.seek(0)
1875 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001876 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1877
Benjamin Petersona1b49012009-03-31 23:11:32 +00001878 def test_unreadable(self):
1879 class UnReadable(self.BytesIO):
1880 def readable(self):
1881 return False
1882 txt = self.TextIOWrapper(UnReadable())
1883 self.assertRaises(IOError, txt.read)
1884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 def test_read_one_by_one(self):
1886 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001887 reads = ""
1888 while True:
1889 c = txt.read(1)
1890 if not c:
1891 break
1892 reads += c
1893 self.assertEquals(reads, "AA\nBB")
1894
1895 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001897 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001899 reads = ""
1900 while True:
1901 c = txt.read(128)
1902 if not c:
1903 break
1904 reads += c
1905 self.assertEquals(reads, "A"*127+"\nB")
1906
1907 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001909
1910 # read one char at a time
1911 reads = ""
1912 while True:
1913 c = txt.read(1)
1914 if not c:
1915 break
1916 reads += c
1917 self.assertEquals(reads, self.normalized)
1918
1919 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001921 txt._CHUNK_SIZE = 4
1922
1923 reads = ""
1924 while True:
1925 c = txt.read(4)
1926 if not c:
1927 break
1928 reads += c
1929 self.assertEquals(reads, self.normalized)
1930
1931 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001933 txt._CHUNK_SIZE = 4
1934
1935 reads = txt.read(4)
1936 reads += txt.read(4)
1937 reads += txt.readline()
1938 reads += txt.readline()
1939 reads += txt.readline()
1940 self.assertEquals(reads, self.normalized)
1941
1942 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001944 txt._CHUNK_SIZE = 4
1945
1946 reads = txt.read(4)
1947 reads += txt.read()
1948 self.assertEquals(reads, self.normalized)
1949
1950 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001952 txt._CHUNK_SIZE = 4
1953
1954 reads = txt.read(4)
1955 pos = txt.tell()
1956 txt.seek(0)
1957 txt.seek(pos)
1958 self.assertEquals(txt.read(4), "BBB\n")
1959
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001960 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 buffer = self.BytesIO(self.testdata)
1962 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001963
1964 self.assertEqual(buffer.seekable(), txt.seekable())
1965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966class CTextIOWrapperTest(TextIOWrapperTest):
1967
1968 def test_initialization(self):
1969 r = self.BytesIO(b"\xc3\xa9\n\n")
1970 b = self.BufferedReader(r, 1000)
1971 t = self.TextIOWrapper(b)
1972 self.assertRaises(TypeError, t.__init__, b, newline=42)
1973 self.assertRaises(ValueError, t.read)
1974 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1975 self.assertRaises(ValueError, t.read)
1976
1977 def test_garbage_collection(self):
1978 # C TextIOWrapper objects are collected, and collecting them flushes
1979 # all data to disk.
1980 # The Python version has __del__, so it ends in gc.garbage instead.
1981 rawio = io.FileIO(support.TESTFN, "wb")
1982 b = self.BufferedWriter(rawio)
1983 t = self.TextIOWrapper(b, encoding="ascii")
1984 t.write("456def")
1985 t.x = t
1986 wr = weakref.ref(t)
1987 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001988 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 self.assert_(wr() is None, wr)
1990 with open(support.TESTFN, "rb") as f:
1991 self.assertEqual(f.read(), b"456def")
1992
1993class PyTextIOWrapperTest(TextIOWrapperTest):
1994 pass
1995
1996
1997class IncrementalNewlineDecoderTest(unittest.TestCase):
1998
1999 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002000 # UTF-8 specific tests for a newline decoder
2001 def _check_decode(b, s, **kwargs):
2002 # We exercise getstate() / setstate() as well as decode()
2003 state = decoder.getstate()
2004 self.assertEquals(decoder.decode(b, **kwargs), s)
2005 decoder.setstate(state)
2006 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002007
Antoine Pitrou180a3362008-12-14 16:36:46 +00002008 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002009
Antoine Pitrou180a3362008-12-14 16:36:46 +00002010 _check_decode(b'\xe8', "")
2011 _check_decode(b'\xa2', "")
2012 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002013
Antoine Pitrou180a3362008-12-14 16:36:46 +00002014 _check_decode(b'\xe8', "")
2015 _check_decode(b'\xa2', "")
2016 _check_decode(b'\x88', "\u8888")
2017
2018 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002019 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2020
Antoine Pitrou180a3362008-12-14 16:36:46 +00002021 decoder.reset()
2022 _check_decode(b'\n', "\n")
2023 _check_decode(b'\r', "")
2024 _check_decode(b'', "\n", final=True)
2025 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002026
Antoine Pitrou180a3362008-12-14 16:36:46 +00002027 _check_decode(b'\r', "")
2028 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002029
Antoine Pitrou180a3362008-12-14 16:36:46 +00002030 _check_decode(b'\r\r\n', "\n\n")
2031 _check_decode(b'\r', "")
2032 _check_decode(b'\r', "\n")
2033 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002034
Antoine Pitrou180a3362008-12-14 16:36:46 +00002035 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2036 _check_decode(b'\xe8\xa2\x88', "\u8888")
2037 _check_decode(b'\n', "\n")
2038 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2039 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002042 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 if encoding is not None:
2044 encoder = codecs.getincrementalencoder(encoding)()
2045 def _decode_bytewise(s):
2046 # Decode one byte at a time
2047 for b in encoder.encode(s):
2048 result.append(decoder.decode(bytes([b])))
2049 else:
2050 encoder = None
2051 def _decode_bytewise(s):
2052 # Decode one char at a time
2053 for c in s:
2054 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002055 self.assertEquals(decoder.newlines, None)
2056 _decode_bytewise("abc\n\r")
2057 self.assertEquals(decoder.newlines, '\n')
2058 _decode_bytewise("\nabc")
2059 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2060 _decode_bytewise("abc\r")
2061 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2062 _decode_bytewise("abc")
2063 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2064 _decode_bytewise("abc\r")
2065 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2066 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 input = "abc"
2068 if encoder is not None:
2069 encoder.reset()
2070 input = encoder.encode(input)
2071 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002072 self.assertEquals(decoder.newlines, None)
2073
2074 def test_newline_decoder(self):
2075 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 # None meaning the IncrementalNewlineDecoder takes unicode input
2077 # rather than bytes input
2078 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002079 'utf-16', 'utf-16-le', 'utf-16-be',
2080 'utf-32', 'utf-32-le', 'utf-32-be',
2081 )
2082 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 decoder = enc and codecs.getincrementaldecoder(enc)()
2084 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2085 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002086 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2088 self.check_newline_decoding_utf8(decoder)
2089
Antoine Pitrou66913e22009-03-06 23:40:56 +00002090 def test_newline_bytes(self):
2091 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2092 def _check(dec):
2093 self.assertEquals(dec.newlines, None)
2094 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2095 self.assertEquals(dec.newlines, None)
2096 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2097 self.assertEquals(dec.newlines, None)
2098 dec = self.IncrementalNewlineDecoder(None, translate=False)
2099 _check(dec)
2100 dec = self.IncrementalNewlineDecoder(None, translate=True)
2101 _check(dec)
2102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2104 pass
2105
2106class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2107 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002108
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002109
Guido van Rossum01a27522007-03-07 01:00:12 +00002110# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002111
Guido van Rossum5abbf752007-08-27 17:39:33 +00002112class MiscIOTest(unittest.TestCase):
2113
Barry Warsaw40e82462008-11-20 20:14:50 +00002114 def tearDown(self):
2115 support.unlink(support.TESTFN)
2116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 def test___all__(self):
2118 for name in self.io.__all__:
2119 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002120 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002121 if name == "open":
2122 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002123 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002124 self.assertTrue(issubclass(obj, Exception), name)
2125 elif not name.startswith("SEEK_"):
2126 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002127
Barry Warsaw40e82462008-11-20 20:14:50 +00002128 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002130 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002131 f.close()
2132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002134 self.assertEquals(f.name, support.TESTFN)
2135 self.assertEquals(f.buffer.name, support.TESTFN)
2136 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2137 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002138 self.assertEquals(f.buffer.mode, "rb")
2139 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002140 f.close()
2141
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002143 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002144 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2145 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002148 self.assertEquals(g.mode, "wb")
2149 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002150 self.assertEquals(g.name, f.fileno())
2151 self.assertEquals(g.raw.name, f.fileno())
2152 f.close()
2153 g.close()
2154
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002155 def test_io_after_close(self):
2156 for kwargs in [
2157 {"mode": "w"},
2158 {"mode": "wb"},
2159 {"mode": "w", "buffering": 1},
2160 {"mode": "w", "buffering": 2},
2161 {"mode": "wb", "buffering": 0},
2162 {"mode": "r"},
2163 {"mode": "rb"},
2164 {"mode": "r", "buffering": 1},
2165 {"mode": "r", "buffering": 2},
2166 {"mode": "rb", "buffering": 0},
2167 {"mode": "w+"},
2168 {"mode": "w+b"},
2169 {"mode": "w+", "buffering": 1},
2170 {"mode": "w+", "buffering": 2},
2171 {"mode": "w+b", "buffering": 0},
2172 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002174 f.close()
2175 self.assertRaises(ValueError, f.flush)
2176 self.assertRaises(ValueError, f.fileno)
2177 self.assertRaises(ValueError, f.isatty)
2178 self.assertRaises(ValueError, f.__iter__)
2179 if hasattr(f, "peek"):
2180 self.assertRaises(ValueError, f.peek, 1)
2181 self.assertRaises(ValueError, f.read)
2182 if hasattr(f, "read1"):
2183 self.assertRaises(ValueError, f.read1, 1024)
2184 if hasattr(f, "readinto"):
2185 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2186 self.assertRaises(ValueError, f.readline)
2187 self.assertRaises(ValueError, f.readlines)
2188 self.assertRaises(ValueError, f.seek, 0)
2189 self.assertRaises(ValueError, f.tell)
2190 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 self.assertRaises(ValueError, f.write,
2192 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002193 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 def test_blockingioerror(self):
2197 # Various BlockingIOError issues
2198 self.assertRaises(TypeError, self.BlockingIOError)
2199 self.assertRaises(TypeError, self.BlockingIOError, 1)
2200 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2201 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2202 b = self.BlockingIOError(1, "")
2203 self.assertEqual(b.characters_written, 0)
2204 class C(str):
2205 pass
2206 c = C("")
2207 b = self.BlockingIOError(1, c)
2208 c.b = b
2209 b.c = c
2210 wr = weakref.ref(c)
2211 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002212 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002213 self.assert_(wr() is None, wr)
2214
2215 def test_abcs(self):
2216 # Test the visible base classes are ABCs.
2217 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2218 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2219 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2220 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2221
2222 def _check_abc_inheritance(self, abcmodule):
2223 with self.open(support.TESTFN, "wb", buffering=0) as f:
2224 self.assertTrue(isinstance(f, abcmodule.IOBase))
2225 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2226 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2227 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2228 with self.open(support.TESTFN, "wb") as f:
2229 self.assertTrue(isinstance(f, abcmodule.IOBase))
2230 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2231 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2232 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2233 with self.open(support.TESTFN, "w") as f:
2234 self.assertTrue(isinstance(f, abcmodule.IOBase))
2235 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2236 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2237 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2238
2239 def test_abc_inheritance(self):
2240 # Test implementations inherit from their respective ABCs
2241 self._check_abc_inheritance(self)
2242
2243 def test_abc_inheritance_official(self):
2244 # Test implementations inherit from the official ABCs of the
2245 # baseline "io" module.
2246 self._check_abc_inheritance(io)
2247
2248class CMiscIOTest(MiscIOTest):
2249 io = io
2250
2251class PyMiscIOTest(MiscIOTest):
2252 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002253
Guido van Rossum28524c72007-02-27 05:47:44 +00002254def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 tests = (CIOTest, PyIOTest,
2256 CBufferedReaderTest, PyBufferedReaderTest,
2257 CBufferedWriterTest, PyBufferedWriterTest,
2258 CBufferedRWPairTest, PyBufferedRWPairTest,
2259 CBufferedRandomTest, PyBufferedRandomTest,
2260 StatefulIncrementalDecoderTest,
2261 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2262 CTextIOWrapperTest, PyTextIOWrapperTest,
2263 CMiscIOTest, PyMiscIOTest,)
2264
2265 # Put the namespaces of the IO module we are testing and some useful mock
2266 # classes in the __dict__ of each test.
2267 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2268 MockNonBlockWriterIO)
2269 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2270 c_io_ns = {name : getattr(io, name) for name in all_members}
2271 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2272 globs = globals()
2273 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2274 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2275 # Avoid turning open into a bound method.
2276 py_io_ns["open"] = pyio.OpenWrapper
2277 for test in tests:
2278 if test.__name__.startswith("C"):
2279 for name, obj in c_io_ns.items():
2280 setattr(test, name, obj)
2281 elif test.__name__.startswith("Py"):
2282 for name, obj in py_io_ns.items():
2283 setattr(test, name, obj)
2284
2285 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002286
2287if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 test_main()