blob: 8241bbec87a2ba33767f5105ff76b13ca0d1248b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000054 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000055
56 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000059 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060 except:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000061 self._extraneous_reads += 1
Guido van Rossum78892e42007-04-06 17:31:18 +000062 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
110class CMockRawIO(MockRawIO, io.RawIOBase):
111 pass
112
113class PyMockRawIO(MockRawIO, pyio.RawIOBase):
114 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000115
Guido van Rossuma9e20242007-03-08 00:43:48 +0000116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117class MisbehavedRawIO(MockRawIO):
118 def write(self, b):
119 return super().write(b) * 2
120
121 def read(self, n=None):
122 return super().read(n) * 2
123
124 def seek(self, pos, whence):
125 return -123
126
127 def tell(self):
128 return -456
129
130 def readinto(self, buf):
131 super().readinto(buf)
132 return len(buf) * 5
133
134class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
135 pass
136
137class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
138 pass
139
140
141class CloseFailureIO(MockRawIO):
142 closed = 0
143
144 def close(self):
145 if not self.closed:
146 self.closed = 1
147 raise IOError
148
149class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
150 pass
151
152class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
153 pass
154
155
156class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000157
158 def __init__(self, data):
159 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161
162 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000164 self.read_history.append(None if res is None else len(res))
165 return res
166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167 def readinto(self, b):
168 res = super().readinto(b)
169 self.read_history.append(res)
170 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class CMockFileIO(MockFileIO, io.BytesIO):
173 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000175class PyMockFileIO(MockFileIO, pyio.BytesIO):
176 pass
177
178
179class MockNonBlockWriterIO:
180
181 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000182 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185 def pop_written(self):
186 s = b"".join(self._write_stack)
187 self._write_stack[:] = []
188 return s
189
190 def block_on(self, char):
191 """Block when a given char is encountered."""
192 self._blocker_char = char
193
194 def readable(self):
195 return True
196
197 def seekable(self):
198 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000199
Guido van Rossum01a27522007-03-07 01:00:12 +0000200 def writable(self):
201 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203 def write(self, b):
204 b = bytes(b)
205 n = -1
206 if self._blocker_char:
207 try:
208 n = b.index(self._blocker_char)
209 except ValueError:
210 pass
211 else:
212 self._blocker_char = None
213 self._write_stack.append(b[:n])
214 raise self.BlockingIOError(0, "test blocking", n)
215 self._write_stack.append(b)
216 return len(b)
217
218class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
219 BlockingIOError = io.BlockingIOError
220
221class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
222 BlockingIOError = pyio.BlockingIOError
223
Guido van Rossuma9e20242007-03-08 00:43:48 +0000224
Guido van Rossum28524c72007-02-27 05:47:44 +0000225class IOTest(unittest.TestCase):
226
Neal Norwitze7789b12008-03-24 06:18:09 +0000227 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000229
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000230 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000231 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000232
Guido van Rossum28524c72007-02-27 05:47:44 +0000233 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000234 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000235 f.truncate(0)
236 self.assertEqual(f.tell(), 5)
237 f.seek(0)
238
239 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(0), 0)
241 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000242 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000243 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000244 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000245 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000246 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000247 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000248 self.assertEqual(f.seek(-1, 2), 13)
249 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000250
Guido van Rossum87429772007-04-10 21:06:59 +0000251 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000252 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000253 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000254
Guido van Rossum9b76da62007-04-11 01:09:03 +0000255 def read_ops(self, f, buffered=False):
256 data = f.read(5)
257 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.readinto(data), 5)
260 self.assertEqual(data, b" worl")
261 self.assertEqual(f.readinto(data), 2)
262 self.assertEqual(len(data), 5)
263 self.assertEqual(data[:2], b"d\n")
264 self.assertEqual(f.seek(0), 0)
265 self.assertEqual(f.read(20), b"hello world\n")
266 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000267 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 self.assertEqual(f.seek(-6, 2), 6)
269 self.assertEqual(f.read(5), b"world")
270 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.seek(-6, 1), 5)
273 self.assertEqual(f.read(5), b" worl")
274 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000275 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000276 if buffered:
277 f.seek(0)
278 self.assertEqual(f.read(), b"hello world\n")
279 f.seek(6)
280 self.assertEqual(f.read(), b"world\n")
281 self.assertEqual(f.read(), b"")
282
Guido van Rossum34d69e52007-04-10 20:08:41 +0000283 LARGE = 2**31
284
Guido van Rossum53807da2007-04-10 19:01:47 +0000285 def large_file_ops(self, f):
286 assert f.readable()
287 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000288 self.assertEqual(f.seek(self.LARGE), self.LARGE)
289 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000290 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000291 self.assertEqual(f.tell(), self.LARGE + 3)
292 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000294 self.assertEqual(f.tell(), self.LARGE + 2)
295 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000296 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000297 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000298 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
299 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000300 self.assertEqual(f.read(2), b"x")
301
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000302 def test_invalid_operations(self):
303 # Try writing on a file opened in read mode and vice-versa.
304 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000305 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000306 self.assertRaises(IOError, fp.read)
307 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000308 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000309 self.assertRaises(IOError, fp.write, b"blah")
310 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000311 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000312 self.assertRaises(IOError, fp.write, "blah")
313 self.assertRaises(IOError, fp.writelines, ["blah\n"])
314
Guido van Rossum28524c72007-02-27 05:47:44 +0000315 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000316 with self.open(support.TESTFN, "wb", buffering=0) as f:
317 self.assertEqual(f.readable(), False)
318 self.assertEqual(f.writable(), True)
319 self.assertEqual(f.seekable(), True)
320 self.write_ops(f)
321 with self.open(support.TESTFN, "rb", buffering=0) as f:
322 self.assertEqual(f.readable(), True)
323 self.assertEqual(f.writable(), False)
324 self.assertEqual(f.seekable(), True)
325 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000326
Guido van Rossum87429772007-04-10 21:06:59 +0000327 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000328 with self.open(support.TESTFN, "wb") as f:
329 self.assertEqual(f.readable(), False)
330 self.assertEqual(f.writable(), True)
331 self.assertEqual(f.seekable(), True)
332 self.write_ops(f)
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readable(), True)
335 self.assertEqual(f.writable(), False)
336 self.assertEqual(f.seekable(), True)
337 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000338
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000339 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000340 with self.open(support.TESTFN, "wb") as f:
341 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
342 with self.open(support.TESTFN, "rb") as f:
343 self.assertEqual(f.readline(), b"abc\n")
344 self.assertEqual(f.readline(10), b"def\n")
345 self.assertEqual(f.readline(2), b"xy")
346 self.assertEqual(f.readline(4), b"zzy\n")
347 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000348 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000349 self.assertRaises(TypeError, f.readline, 5.3)
350 with self.open(support.TESTFN, "r") as f:
351 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352
Guido van Rossum28524c72007-02-27 05:47:44 +0000353 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000354 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000355 self.write_ops(f)
356 data = f.getvalue()
357 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000358 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000360
Guido van Rossum53807da2007-04-10 19:01:47 +0000361 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 # On Windows and Mac OSX this test comsumes large resources; It takes
363 # a long time to build the >2GB file and takes >2GB of disk space
364 # therefore the resource must be enabled to run this test.
365 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000366 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000367 print("\nTesting large file ops skipped on %s." % sys.platform,
368 file=sys.stderr)
369 print("It requires %d bytes and a long time." % self.LARGE,
370 file=sys.stderr)
371 print("Use 'regrtest.py -u largefile test_io' to run it.",
372 file=sys.stderr)
373 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000374 with self.open(support.TESTFN, "w+b", 0) as f:
375 self.large_file_ops(f)
376 with self.open(support.TESTFN, "w+b") as f:
377 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000378
379 def test_with_open(self):
380 for bufsize in (0, 1, 100):
381 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000382 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000383 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000384 self.assertEqual(f.closed, True)
385 f = None
386 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000387 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000388 1/0
389 except ZeroDivisionError:
390 self.assertEqual(f.closed, True)
391 else:
392 self.fail("1/0 didn't raise an exception")
393
Antoine Pitrou08838b62009-01-21 00:55:13 +0000394 # issue 5008
395 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000399 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000401 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000403 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000404
Guido van Rossum87429772007-04-10 21:06:59 +0000405 def test_destructor(self):
406 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000407 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000408 def __del__(self):
409 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 try:
411 f = super().__del__
412 except AttributeError:
413 pass
414 else:
415 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000416 def close(self):
417 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000418 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000419 def flush(self):
420 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000421 super().flush()
422 f = MyFileIO(support.TESTFN, "wb")
423 f.write(b"xxx")
424 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000425 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000426 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000427 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000428 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000429
430 def _check_base_destructor(self, base):
431 record = []
432 class MyIO(base):
433 def __init__(self):
434 # This exercises the availability of attributes on object
435 # destruction.
436 # (in the C version, close() is called by the tp_dealloc
437 # function, not by __del__)
438 self.on_del = 1
439 self.on_close = 2
440 self.on_flush = 3
441 def __del__(self):
442 record.append(self.on_del)
443 try:
444 f = super().__del__
445 except AttributeError:
446 pass
447 else:
448 f()
449 def close(self):
450 record.append(self.on_close)
451 super().close()
452 def flush(self):
453 record.append(self.on_flush)
454 super().flush()
455 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000456 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000457 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000458 self.assertEqual(record, [1, 2, 3])
459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 def test_IOBase_destructor(self):
461 self._check_base_destructor(self.IOBase)
462
463 def test_RawIOBase_destructor(self):
464 self._check_base_destructor(self.RawIOBase)
465
466 def test_BufferedIOBase_destructor(self):
467 self._check_base_destructor(self.BufferedIOBase)
468
469 def test_TextIOBase_destructor(self):
470 self._check_base_destructor(self.TextIOBase)
471
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000473 with self.open(support.TESTFN, "wb") as f:
474 f.write(b"xxx")
475 with self.open(support.TESTFN, "rb") as f:
476 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000477
Guido van Rossumd4103952007-04-12 05:44:49 +0000478 def test_array_writes(self):
479 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000480 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000481 with self.open(support.TESTFN, "wb", 0) as f:
482 self.assertEqual(f.write(a), n)
483 with self.open(support.TESTFN, "wb") as f:
484 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000485
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000486 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000488 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 def test_read_closed(self):
491 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 with self.open(support.TESTFN, "r") as f:
494 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000495 self.assertEqual(file.read(), "egg\n")
496 file.seek(0)
497 file.close()
498 self.assertRaises(ValueError, file.read)
499
500 def test_no_closefd_with_filename(self):
501 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000502 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000503
504 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000506 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000509 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000510 self.assertEqual(file.buffer.raw.closefd, False)
511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000512 def test_garbage_collection(self):
513 # FileIO objects are collected, and collecting them flushes
514 # all data to disk.
515 f = self.FileIO(support.TESTFN, "wb")
516 f.write(b"abcxxx")
517 f.f = f
518 wr = weakref.ref(f)
519 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000520 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000521 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000522 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000523 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000524
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000525 def test_unbounded_file(self):
526 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
527 zero = "/dev/zero"
528 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000529 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000531 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000532 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000533 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000534 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000536 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000537 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000538 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000539 self.assertRaises(OverflowError, f.read)
540
Antoine Pitroufaf90072010-05-03 16:58:19 +0000541 def test_flush_error_on_close(self):
542 f = self.open(support.TESTFN, "wb", buffering=0)
543 def bad_flush():
544 raise IOError()
545 f.flush = bad_flush
546 self.assertRaises(IOError, f.close) # exception not swallowed
547
548 def test_multi_close(self):
549 f = self.open(support.TESTFN, "wb", buffering=0)
550 f.close()
551 f.close()
552 f.close()
553 self.assertRaises(ValueError, f.flush)
554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555class CIOTest(IOTest):
556 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558class PyIOTest(IOTest):
559 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000560
Guido van Rossuma9e20242007-03-08 00:43:48 +0000561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562class CommonBufferedTests:
563 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
564
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000565 def test_detach(self):
566 raw = self.MockRawIO()
567 buf = self.tp(raw)
568 self.assertIs(buf.detach(), raw)
569 self.assertRaises(ValueError, buf.detach)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_fileno(self):
572 rawio = self.MockRawIO()
573 bufio = self.tp(rawio)
574
575 self.assertEquals(42, bufio.fileno())
576
577 def test_no_fileno(self):
578 # XXX will we always have fileno() function? If so, kill
579 # this test. Else, write it.
580 pass
581
582 def test_invalid_args(self):
583 rawio = self.MockRawIO()
584 bufio = self.tp(rawio)
585 # Invalid whence
586 self.assertRaises(ValueError, bufio.seek, 0, -1)
587 self.assertRaises(ValueError, bufio.seek, 0, 3)
588
589 def test_override_destructor(self):
590 tp = self.tp
591 record = []
592 class MyBufferedIO(tp):
593 def __del__(self):
594 record.append(1)
595 try:
596 f = super().__del__
597 except AttributeError:
598 pass
599 else:
600 f()
601 def close(self):
602 record.append(2)
603 super().close()
604 def flush(self):
605 record.append(3)
606 super().flush()
607 rawio = self.MockRawIO()
608 bufio = MyBufferedIO(rawio)
609 writable = bufio.writable()
610 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000611 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 if writable:
613 self.assertEqual(record, [1, 2, 3])
614 else:
615 self.assertEqual(record, [1, 2])
616
617 def test_context_manager(self):
618 # Test usability as a context manager
619 rawio = self.MockRawIO()
620 bufio = self.tp(rawio)
621 def _with():
622 with bufio:
623 pass
624 _with()
625 # bufio should now be closed, and using it a second time should raise
626 # a ValueError.
627 self.assertRaises(ValueError, _with)
628
629 def test_error_through_destructor(self):
630 # Test that the exception state is not modified by a destructor,
631 # even if close() fails.
632 rawio = self.CloseFailureIO()
633 def f():
634 self.tp(rawio).xyzzy
635 with support.captured_output("stderr") as s:
636 self.assertRaises(AttributeError, f)
637 s = s.getvalue().strip()
638 if s:
639 # The destructor *may* have printed an unraisable error, check it
640 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000641 self.assertTrue(s.startswith("Exception IOError: "), s)
642 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000643
Antoine Pitrou716c4442009-05-23 19:04:03 +0000644 def test_repr(self):
645 raw = self.MockRawIO()
646 b = self.tp(raw)
647 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
648 self.assertEqual(repr(b), "<%s>" % clsname)
649 raw.name = "dummy"
650 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
651 raw.name = b"dummy"
652 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
653
Antoine Pitroufaf90072010-05-03 16:58:19 +0000654 def test_flush_error_on_close(self):
655 raw = self.MockRawIO()
656 def bad_flush():
657 raise IOError()
658 raw.flush = bad_flush
659 b = self.tp(raw)
660 self.assertRaises(IOError, b.close) # exception not swallowed
661
662 def test_multi_close(self):
663 raw = self.MockRawIO()
664 b = self.tp(raw)
665 b.close()
666 b.close()
667 b.close()
668 self.assertRaises(ValueError, b.flush)
669
Guido van Rossum78892e42007-04-06 17:31:18 +0000670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
672 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674 def test_constructor(self):
675 rawio = self.MockRawIO([b"abc"])
676 bufio = self.tp(rawio)
677 bufio.__init__(rawio)
678 bufio.__init__(rawio, buffer_size=1024)
679 bufio.__init__(rawio, buffer_size=16)
680 self.assertEquals(b"abc", bufio.read())
681 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
682 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
683 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
684 rawio = self.MockRawIO([b"abc"])
685 bufio.__init__(rawio)
686 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000688 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000689 for arg in (None, 7):
690 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
691 bufio = self.tp(rawio)
692 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 # Invalid args
694 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_read1(self):
697 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
698 bufio = self.tp(rawio)
699 self.assertEquals(b"a", bufio.read(1))
700 self.assertEquals(b"b", bufio.read1(1))
701 self.assertEquals(rawio._reads, 1)
702 self.assertEquals(b"c", bufio.read1(100))
703 self.assertEquals(rawio._reads, 1)
704 self.assertEquals(b"d", bufio.read1(100))
705 self.assertEquals(rawio._reads, 2)
706 self.assertEquals(b"efg", bufio.read1(100))
707 self.assertEquals(rawio._reads, 3)
708 self.assertEquals(b"", bufio.read1(100))
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000709 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710 # Invalid args
711 self.assertRaises(ValueError, bufio.read1, -1)
712
713 def test_readinto(self):
714 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
715 bufio = self.tp(rawio)
716 b = bytearray(2)
717 self.assertEquals(bufio.readinto(b), 2)
718 self.assertEquals(b, b"ab")
719 self.assertEquals(bufio.readinto(b), 2)
720 self.assertEquals(b, b"cd")
721 self.assertEquals(bufio.readinto(b), 2)
722 self.assertEquals(b, b"ef")
723 self.assertEquals(bufio.readinto(b), 1)
724 self.assertEquals(b, b"gf")
725 self.assertEquals(bufio.readinto(b), 0)
726 self.assertEquals(b, b"gf")
727
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000728 def test_readlines(self):
729 def bufio():
730 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
731 return self.tp(rawio)
732 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
733 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
734 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
735
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000737 data = b"abcdefghi"
738 dlen = len(data)
739
740 tests = [
741 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
742 [ 100, [ 3, 3, 3], [ dlen ] ],
743 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
744 ]
745
746 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 rawio = self.MockFileIO(data)
748 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000749 pos = 0
750 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000751 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000752 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000754 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000757 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
759 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000760
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000761 self.assertEquals(b"abcd", bufio.read(6))
762 self.assertEquals(b"e", bufio.read(1))
763 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000764 self.assertEquals(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000765 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000766 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000768 def test_read_past_eof(self):
769 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
770 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000771
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000772 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000774 def test_read_all(self):
775 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
776 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000777
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000778 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000781 try:
782 # Write out many bytes with exactly the same number of 0's,
783 # 1's... 255's. This will help us check that concurrent reading
784 # doesn't duplicate or forget contents.
785 N = 1000
786 l = list(range(256)) * N
787 random.shuffle(l)
788 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000789 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000790 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000791 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000792 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000793 errors = []
794 results = []
795 def f():
796 try:
797 # Intra-buffer read then buffer-flushing read
798 for n in cycle([1, 19]):
799 s = bufio.read(n)
800 if not s:
801 break
802 # list.append() is atomic
803 results.append(s)
804 except Exception as e:
805 errors.append(e)
806 raise
807 threads = [threading.Thread(target=f) for x in range(20)]
808 for t in threads:
809 t.start()
810 time.sleep(0.02) # yield
811 for t in threads:
812 t.join()
813 self.assertFalse(errors,
814 "the following exceptions were caught: %r" % errors)
815 s = b''.join(results)
816 for i in range(256):
817 c = bytes(bytearray([i]))
818 self.assertEqual(s.count(c), N)
819 finally:
820 support.unlink(support.TESTFN)
821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000822 def test_misbehaved_io(self):
823 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
824 bufio = self.tp(rawio)
825 self.assertRaises(IOError, bufio.seek, 0)
826 self.assertRaises(IOError, bufio.tell)
827
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000828 def test_no_extraneous_read(self):
829 # Issue #9550; when the raw IO object has satisfied the read request,
830 # we should not issue any additional reads, otherwise it may block
831 # (e.g. socket).
832 bufsize = 16
833 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
834 rawio = self.MockRawIO([b"x" * n])
835 bufio = self.tp(rawio, bufsize)
836 self.assertEqual(bufio.read(n), b"x" * n)
837 # Simple case: one raw read is enough to satisfy the request.
838 self.assertEqual(rawio._extraneous_reads, 0,
839 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
840 # A more complex case where two raw reads are needed to satisfy
841 # the request.
842 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
843 bufio = self.tp(rawio, bufsize)
844 self.assertEqual(bufio.read(n), b"x" * n)
845 self.assertEqual(rawio._extraneous_reads, 0,
846 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
847
848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849class CBufferedReaderTest(BufferedReaderTest):
850 tp = io.BufferedReader
851
852 def test_constructor(self):
853 BufferedReaderTest.test_constructor(self)
854 # The allocation can succeed on 32-bit builds, e.g. with more
855 # than 2GB RAM and a 64-bit kernel.
856 if sys.maxsize > 0x7FFFFFFF:
857 rawio = self.MockRawIO()
858 bufio = self.tp(rawio)
859 self.assertRaises((OverflowError, MemoryError, ValueError),
860 bufio.__init__, rawio, sys.maxsize)
861
862 def test_initialization(self):
863 rawio = self.MockRawIO([b"abc"])
864 bufio = self.tp(rawio)
865 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
866 self.assertRaises(ValueError, bufio.read)
867 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
868 self.assertRaises(ValueError, bufio.read)
869 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
870 self.assertRaises(ValueError, bufio.read)
871
872 def test_misbehaved_io_read(self):
873 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
874 bufio = self.tp(rawio)
875 # _pyio.BufferedReader seems to implement reading different, so that
876 # checking this is not so easy.
877 self.assertRaises(IOError, bufio.read, 10)
878
879 def test_garbage_collection(self):
880 # C BufferedReader objects are collected.
881 # The Python version has __del__, so it ends into gc.garbage instead
882 rawio = self.FileIO(support.TESTFN, "w+b")
883 f = self.tp(rawio)
884 f.f = f
885 wr = weakref.ref(f)
886 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000887 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000888 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889
890class PyBufferedReaderTest(BufferedReaderTest):
891 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000892
Guido van Rossuma9e20242007-03-08 00:43:48 +0000893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
895 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 def test_constructor(self):
898 rawio = self.MockRawIO()
899 bufio = self.tp(rawio)
900 bufio.__init__(rawio)
901 bufio.__init__(rawio, buffer_size=1024)
902 bufio.__init__(rawio, buffer_size=16)
903 self.assertEquals(3, bufio.write(b"abc"))
904 bufio.flush()
905 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
906 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
907 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
908 bufio.__init__(rawio)
909 self.assertEquals(3, bufio.write(b"ghi"))
910 bufio.flush()
911 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
912
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000913 def test_detach_flush(self):
914 raw = self.MockRawIO()
915 buf = self.tp(raw)
916 buf.write(b"howdy!")
917 self.assertFalse(raw._write_stack)
918 buf.detach()
919 self.assertEqual(raw._write_stack, [b"howdy!"])
920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000922 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 writer = self.MockRawIO()
924 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000925 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000926 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 def test_write_overflow(self):
929 writer = self.MockRawIO()
930 bufio = self.tp(writer, 8)
931 contents = b"abcdefghijklmnop"
932 for n in range(0, len(contents), 3):
933 bufio.write(contents[n:n+3])
934 flushed = b"".join(writer._write_stack)
935 # At least (total - 8) bytes were implicitly flushed, perhaps more
936 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000937 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def check_writes(self, intermediate_func):
940 # Lots of writes, test the flushed output is as expected.
941 contents = bytes(range(256)) * 1000
942 n = 0
943 writer = self.MockRawIO()
944 bufio = self.tp(writer, 13)
945 # Generator of write sizes: repeat each N 15 times then proceed to N+1
946 def gen_sizes():
947 for size in count(1):
948 for i in range(15):
949 yield size
950 sizes = gen_sizes()
951 while n < len(contents):
952 size = min(next(sizes), len(contents) - n)
953 self.assertEquals(bufio.write(contents[n:n+size]), size)
954 intermediate_func(bufio)
955 n += size
956 bufio.flush()
957 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_writes(self):
960 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_writes_and_flushes(self):
963 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_writes_and_seeks(self):
966 def _seekabs(bufio):
967 pos = bufio.tell()
968 bufio.seek(pos + 1, 0)
969 bufio.seek(pos - 1, 0)
970 bufio.seek(pos, 0)
971 self.check_writes(_seekabs)
972 def _seekrel(bufio):
973 pos = bufio.seek(0, 1)
974 bufio.seek(+1, 1)
975 bufio.seek(-1, 1)
976 bufio.seek(pos, 0)
977 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000978
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979 def test_writes_and_truncates(self):
980 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982 def test_write_non_blocking(self):
983 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000984 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 self.assertEquals(bufio.write(b"abcd"), 4)
987 self.assertEquals(bufio.write(b"efghi"), 5)
988 # 1 byte will be written, the rest will be buffered
989 raw.block_on(b"k")
990 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 # 8 bytes will be written, 8 will be buffered and the rest will be lost
993 raw.block_on(b"0")
994 try:
995 bufio.write(b"opqrwxyz0123456789")
996 except self.BlockingIOError as e:
997 written = e.characters_written
998 else:
999 self.fail("BlockingIOError should have been raised")
1000 self.assertEquals(written, 16)
1001 self.assertEquals(raw.pop_written(),
1002 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001004 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1005 s = raw.pop_written()
1006 # Previously buffered bytes were flushed
1007 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001009 def test_write_and_rewind(self):
1010 raw = io.BytesIO()
1011 bufio = self.tp(raw, 4)
1012 self.assertEqual(bufio.write(b"abcdef"), 6)
1013 self.assertEqual(bufio.tell(), 6)
1014 bufio.seek(0, 0)
1015 self.assertEqual(bufio.write(b"XY"), 2)
1016 bufio.seek(6, 0)
1017 self.assertEqual(raw.getvalue(), b"XYcdef")
1018 self.assertEqual(bufio.write(b"123456"), 6)
1019 bufio.flush()
1020 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 def test_flush(self):
1023 writer = self.MockRawIO()
1024 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001025 bufio.write(b"abc")
1026 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001027 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029 def test_destructor(self):
1030 writer = self.MockRawIO()
1031 bufio = self.tp(writer, 8)
1032 bufio.write(b"abc")
1033 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001034 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 self.assertEquals(b"abc", writer._write_stack[0])
1036
1037 def test_truncate(self):
1038 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001039 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 bufio = self.tp(raw, 8)
1041 bufio.write(b"abcdef")
1042 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001043 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001044 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 self.assertEqual(f.read(), b"abc")
1046
1047 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001048 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 # Write out many bytes from many threads and test they were
1050 # all flushed.
1051 N = 1000
1052 contents = bytes(range(256)) * N
1053 sizes = cycle([1, 19])
1054 n = 0
1055 queue = deque()
1056 while n < len(contents):
1057 size = next(sizes)
1058 queue.append(contents[n:n+size])
1059 n += size
1060 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001061 # We use a real file object because it allows us to
1062 # exercise situations where the GIL is released before
1063 # writing the buffer to the raw streams. This is in addition
1064 # to concurrency issues due to switching threads in the middle
1065 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001066 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001068 errors = []
1069 def f():
1070 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 while True:
1072 try:
1073 s = queue.popleft()
1074 except IndexError:
1075 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001076 bufio.write(s)
1077 except Exception as e:
1078 errors.append(e)
1079 raise
1080 threads = [threading.Thread(target=f) for x in range(20)]
1081 for t in threads:
1082 t.start()
1083 time.sleep(0.02) # yield
1084 for t in threads:
1085 t.join()
1086 self.assertFalse(errors,
1087 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001089 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 s = f.read()
1091 for i in range(256):
1092 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001093 finally:
1094 support.unlink(support.TESTFN)
1095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_misbehaved_io(self):
1097 rawio = self.MisbehavedRawIO()
1098 bufio = self.tp(rawio, 5)
1099 self.assertRaises(IOError, bufio.seek, 0)
1100 self.assertRaises(IOError, bufio.tell)
1101 self.assertRaises(IOError, bufio.write, b"abcdef")
1102
Benjamin Peterson59406a92009-03-26 17:10:29 +00001103 def test_max_buffer_size_deprecation(self):
1104 with support.check_warnings() as w:
1105 warnings.simplefilter("always", DeprecationWarning)
1106 self.tp(self.MockRawIO(), 8, 12)
1107 self.assertEqual(len(w.warnings), 1)
1108 warning = w.warnings[0]
1109 self.assertTrue(warning.category is DeprecationWarning)
1110 self.assertEqual(str(warning.message),
1111 "max_buffer_size is deprecated")
1112
1113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114class CBufferedWriterTest(BufferedWriterTest):
1115 tp = io.BufferedWriter
1116
1117 def test_constructor(self):
1118 BufferedWriterTest.test_constructor(self)
1119 # The allocation can succeed on 32-bit builds, e.g. with more
1120 # than 2GB RAM and a 64-bit kernel.
1121 if sys.maxsize > 0x7FFFFFFF:
1122 rawio = self.MockRawIO()
1123 bufio = self.tp(rawio)
1124 self.assertRaises((OverflowError, MemoryError, ValueError),
1125 bufio.__init__, rawio, sys.maxsize)
1126
1127 def test_initialization(self):
1128 rawio = self.MockRawIO()
1129 bufio = self.tp(rawio)
1130 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1131 self.assertRaises(ValueError, bufio.write, b"def")
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1133 self.assertRaises(ValueError, bufio.write, b"def")
1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1135 self.assertRaises(ValueError, bufio.write, b"def")
1136
1137 def test_garbage_collection(self):
1138 # C BufferedWriter objects are collected, and collecting them flushes
1139 # all data to disk.
1140 # The Python version has __del__, so it ends into gc.garbage instead
1141 rawio = self.FileIO(support.TESTFN, "w+b")
1142 f = self.tp(rawio)
1143 f.write(b"123xxx")
1144 f.x = f
1145 wr = weakref.ref(f)
1146 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001147 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001148 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001149 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 self.assertEqual(f.read(), b"123xxx")
1151
1152
1153class PyBufferedWriterTest(BufferedWriterTest):
1154 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001155
Guido van Rossum01a27522007-03-07 01:00:12 +00001156class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001157
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001158 def test_constructor(self):
1159 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001160 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001162 def test_detach(self):
1163 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1164 self.assertRaises(self.UnsupportedOperation, pair.detach)
1165
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001166 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001167 with support.check_warnings() as w:
1168 warnings.simplefilter("always", DeprecationWarning)
1169 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1170 self.assertEqual(len(w.warnings), 1)
1171 warning = w.warnings[0]
1172 self.assertTrue(warning.category is DeprecationWarning)
1173 self.assertEqual(str(warning.message),
1174 "max_buffer_size is deprecated")
1175
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001176 def test_constructor_with_not_readable(self):
1177 class NotReadable(MockRawIO):
1178 def readable(self):
1179 return False
1180
1181 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1182
1183 def test_constructor_with_not_writeable(self):
1184 class NotWriteable(MockRawIO):
1185 def writable(self):
1186 return False
1187
1188 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1189
1190 def test_read(self):
1191 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1192
1193 self.assertEqual(pair.read(3), b"abc")
1194 self.assertEqual(pair.read(1), b"d")
1195 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001196 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1197 self.assertEqual(pair.read(None), b"abc")
1198
1199 def test_readlines(self):
1200 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1201 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1202 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1203 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001204
1205 def test_read1(self):
1206 # .read1() is delegated to the underlying reader object, so this test
1207 # can be shallow.
1208 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1209
1210 self.assertEqual(pair.read1(3), b"abc")
1211
1212 def test_readinto(self):
1213 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1214
1215 data = bytearray(5)
1216 self.assertEqual(pair.readinto(data), 5)
1217 self.assertEqual(data, b"abcde")
1218
1219 def test_write(self):
1220 w = self.MockRawIO()
1221 pair = self.tp(self.MockRawIO(), w)
1222
1223 pair.write(b"abc")
1224 pair.flush()
1225 pair.write(b"def")
1226 pair.flush()
1227 self.assertEqual(w._write_stack, [b"abc", b"def"])
1228
1229 def test_peek(self):
1230 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1231
1232 self.assertTrue(pair.peek(3).startswith(b"abc"))
1233 self.assertEqual(pair.read(3), b"abc")
1234
1235 def test_readable(self):
1236 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1237 self.assertTrue(pair.readable())
1238
1239 def test_writeable(self):
1240 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1241 self.assertTrue(pair.writable())
1242
1243 def test_seekable(self):
1244 # BufferedRWPairs are never seekable, even if their readers and writers
1245 # are.
1246 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1247 self.assertFalse(pair.seekable())
1248
1249 # .flush() is delegated to the underlying writer object and has been
1250 # tested in the test_write method.
1251
1252 def test_close_and_closed(self):
1253 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1254 self.assertFalse(pair.closed)
1255 pair.close()
1256 self.assertTrue(pair.closed)
1257
1258 def test_isatty(self):
1259 class SelectableIsAtty(MockRawIO):
1260 def __init__(self, isatty):
1261 MockRawIO.__init__(self)
1262 self._isatty = isatty
1263
1264 def isatty(self):
1265 return self._isatty
1266
1267 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1268 self.assertFalse(pair.isatty())
1269
1270 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1271 self.assertTrue(pair.isatty())
1272
1273 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1274 self.assertTrue(pair.isatty())
1275
1276 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1277 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279class CBufferedRWPairTest(BufferedRWPairTest):
1280 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282class PyBufferedRWPairTest(BufferedRWPairTest):
1283 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001284
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001285
1286class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1287 read_mode = "rb+"
1288 write_mode = "wb+"
1289
1290 def test_constructor(self):
1291 BufferedReaderTest.test_constructor(self)
1292 BufferedWriterTest.test_constructor(self)
1293
1294 def test_read_and_write(self):
1295 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001296 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001297
1298 self.assertEqual(b"as", rw.read(2))
1299 rw.write(b"ddd")
1300 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001301 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001303 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 def test_seek_and_tell(self):
1306 raw = self.BytesIO(b"asdfghjkl")
1307 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001308
1309 self.assertEquals(b"as", rw.read(2))
1310 self.assertEquals(2, rw.tell())
1311 rw.seek(0, 0)
1312 self.assertEquals(b"asdf", rw.read(4))
1313
1314 rw.write(b"asdf")
1315 rw.seek(0, 0)
1316 self.assertEquals(b"asdfasdfl", rw.read())
1317 self.assertEquals(9, rw.tell())
1318 rw.seek(-4, 2)
1319 self.assertEquals(5, rw.tell())
1320 rw.seek(2, 1)
1321 self.assertEquals(7, rw.tell())
1322 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001323 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001324
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325 def check_flush_and_read(self, read_func):
1326 raw = self.BytesIO(b"abcdefghi")
1327 bufio = self.tp(raw)
1328
1329 self.assertEquals(b"ab", read_func(bufio, 2))
1330 bufio.write(b"12")
1331 self.assertEquals(b"ef", read_func(bufio, 2))
1332 self.assertEquals(6, bufio.tell())
1333 bufio.flush()
1334 self.assertEquals(6, bufio.tell())
1335 self.assertEquals(b"ghi", read_func(bufio))
1336 raw.seek(0, 0)
1337 raw.write(b"XYZ")
1338 # flush() resets the read buffer
1339 bufio.flush()
1340 bufio.seek(0, 0)
1341 self.assertEquals(b"XYZ", read_func(bufio, 3))
1342
1343 def test_flush_and_read(self):
1344 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1345
1346 def test_flush_and_readinto(self):
1347 def _readinto(bufio, n=-1):
1348 b = bytearray(n if n >= 0 else 9999)
1349 n = bufio.readinto(b)
1350 return bytes(b[:n])
1351 self.check_flush_and_read(_readinto)
1352
1353 def test_flush_and_peek(self):
1354 def _peek(bufio, n=-1):
1355 # This relies on the fact that the buffer can contain the whole
1356 # raw stream, otherwise peek() can return less.
1357 b = bufio.peek(n)
1358 if n != -1:
1359 b = b[:n]
1360 bufio.seek(len(b), 1)
1361 return b
1362 self.check_flush_and_read(_peek)
1363
1364 def test_flush_and_write(self):
1365 raw = self.BytesIO(b"abcdefghi")
1366 bufio = self.tp(raw)
1367
1368 bufio.write(b"123")
1369 bufio.flush()
1370 bufio.write(b"45")
1371 bufio.flush()
1372 bufio.seek(0, 0)
1373 self.assertEquals(b"12345fghi", raw.getvalue())
1374 self.assertEquals(b"12345fghi", bufio.read())
1375
1376 def test_threads(self):
1377 BufferedReaderTest.test_threads(self)
1378 BufferedWriterTest.test_threads(self)
1379
1380 def test_writes_and_peek(self):
1381 def _peek(bufio):
1382 bufio.peek(1)
1383 self.check_writes(_peek)
1384 def _peek(bufio):
1385 pos = bufio.tell()
1386 bufio.seek(-1, 1)
1387 bufio.peek(1)
1388 bufio.seek(pos, 0)
1389 self.check_writes(_peek)
1390
1391 def test_writes_and_reads(self):
1392 def _read(bufio):
1393 bufio.seek(-1, 1)
1394 bufio.read(1)
1395 self.check_writes(_read)
1396
1397 def test_writes_and_read1s(self):
1398 def _read1(bufio):
1399 bufio.seek(-1, 1)
1400 bufio.read1(1)
1401 self.check_writes(_read1)
1402
1403 def test_writes_and_readintos(self):
1404 def _read(bufio):
1405 bufio.seek(-1, 1)
1406 bufio.readinto(bytearray(1))
1407 self.check_writes(_read)
1408
Antoine Pitrou0473e562009-08-06 20:52:43 +00001409 def test_write_after_readahead(self):
1410 # Issue #6629: writing after the buffer was filled by readahead should
1411 # first rewind the raw stream.
1412 for overwrite_size in [1, 5]:
1413 raw = self.BytesIO(b"A" * 10)
1414 bufio = self.tp(raw, 4)
1415 # Trigger readahead
1416 self.assertEqual(bufio.read(1), b"A")
1417 self.assertEqual(bufio.tell(), 1)
1418 # Overwriting should rewind the raw stream if it needs so
1419 bufio.write(b"B" * overwrite_size)
1420 self.assertEqual(bufio.tell(), overwrite_size + 1)
1421 # If the write size was smaller than the buffer size, flush() and
1422 # check that rewind happens.
1423 bufio.flush()
1424 self.assertEqual(bufio.tell(), overwrite_size + 1)
1425 s = raw.getvalue()
1426 self.assertEqual(s,
1427 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1428
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001429 def test_truncate_after_read_or_write(self):
1430 raw = self.BytesIO(b"A" * 10)
1431 bufio = self.tp(raw, 100)
1432 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1433 self.assertEqual(bufio.truncate(), 2)
1434 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1435 self.assertEqual(bufio.truncate(), 4)
1436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437 def test_misbehaved_io(self):
1438 BufferedReaderTest.test_misbehaved_io(self)
1439 BufferedWriterTest.test_misbehaved_io(self)
1440
1441class CBufferedRandomTest(BufferedRandomTest):
1442 tp = io.BufferedRandom
1443
1444 def test_constructor(self):
1445 BufferedRandomTest.test_constructor(self)
1446 # The allocation can succeed on 32-bit builds, e.g. with more
1447 # than 2GB RAM and a 64-bit kernel.
1448 if sys.maxsize > 0x7FFFFFFF:
1449 rawio = self.MockRawIO()
1450 bufio = self.tp(rawio)
1451 self.assertRaises((OverflowError, MemoryError, ValueError),
1452 bufio.__init__, rawio, sys.maxsize)
1453
1454 def test_garbage_collection(self):
1455 CBufferedReaderTest.test_garbage_collection(self)
1456 CBufferedWriterTest.test_garbage_collection(self)
1457
1458class PyBufferedRandomTest(BufferedRandomTest):
1459 tp = pyio.BufferedRandom
1460
1461
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001462# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1463# properties:
1464# - A single output character can correspond to many bytes of input.
1465# - The number of input bytes to complete the character can be
1466# undetermined until the last input byte is received.
1467# - The number of input bytes can vary depending on previous input.
1468# - A single input byte can correspond to many characters of output.
1469# - The number of output characters can be undetermined until the
1470# last input byte is received.
1471# - The number of output characters can vary depending on previous input.
1472
1473class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1474 """
1475 For testing seek/tell behavior with a stateful, buffering decoder.
1476
1477 Input is a sequence of words. Words may be fixed-length (length set
1478 by input) or variable-length (period-terminated). In variable-length
1479 mode, extra periods are ignored. Possible words are:
1480 - 'i' followed by a number sets the input length, I (maximum 99).
1481 When I is set to 0, words are space-terminated.
1482 - 'o' followed by a number sets the output length, O (maximum 99).
1483 - Any other word is converted into a word followed by a period on
1484 the output. The output word consists of the input word truncated
1485 or padded out with hyphens to make its length equal to O. If O
1486 is 0, the word is output verbatim without truncating or padding.
1487 I and O are initially set to 1. When I changes, any buffered input is
1488 re-scanned according to the new I. EOF also terminates the last word.
1489 """
1490
1491 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001492 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001493 self.reset()
1494
1495 def __repr__(self):
1496 return '<SID %x>' % id(self)
1497
1498 def reset(self):
1499 self.i = 1
1500 self.o = 1
1501 self.buffer = bytearray()
1502
1503 def getstate(self):
1504 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1505 return bytes(self.buffer), i*100 + o
1506
1507 def setstate(self, state):
1508 buffer, io = state
1509 self.buffer = bytearray(buffer)
1510 i, o = divmod(io, 100)
1511 self.i, self.o = i ^ 1, o ^ 1
1512
1513 def decode(self, input, final=False):
1514 output = ''
1515 for b in input:
1516 if self.i == 0: # variable-length, terminated with period
1517 if b == ord('.'):
1518 if self.buffer:
1519 output += self.process_word()
1520 else:
1521 self.buffer.append(b)
1522 else: # fixed-length, terminate after self.i bytes
1523 self.buffer.append(b)
1524 if len(self.buffer) == self.i:
1525 output += self.process_word()
1526 if final and self.buffer: # EOF terminates the last word
1527 output += self.process_word()
1528 return output
1529
1530 def process_word(self):
1531 output = ''
1532 if self.buffer[0] == ord('i'):
1533 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1534 elif self.buffer[0] == ord('o'):
1535 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1536 else:
1537 output = self.buffer.decode('ascii')
1538 if len(output) < self.o:
1539 output += '-'*self.o # pad out with hyphens
1540 if self.o:
1541 output = output[:self.o] # truncate to output length
1542 output += '.'
1543 self.buffer = bytearray()
1544 return output
1545
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001546 codecEnabled = False
1547
1548 @classmethod
1549 def lookupTestDecoder(cls, name):
1550 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001551 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001552 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001553 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001554 incrementalencoder=None,
1555 streamreader=None, streamwriter=None,
1556 incrementaldecoder=cls)
1557
1558# Register the previous decoder for testing.
1559# Disabled by default, tests will enable it.
1560codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1561
1562
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001563class StatefulIncrementalDecoderTest(unittest.TestCase):
1564 """
1565 Make sure the StatefulIncrementalDecoder actually works.
1566 """
1567
1568 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001569 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001570 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001571 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001572 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001573 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001574 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001575 # I=0, O=6 (variable-length input, fixed-length output)
1576 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1577 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001578 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001579 # I=6, O=3 (fixed-length input > fixed-length output)
1580 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1581 # I=0, then 3; O=29, then 15 (with longer output)
1582 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1583 'a----------------------------.' +
1584 'b----------------------------.' +
1585 'cde--------------------------.' +
1586 'abcdefghijabcde.' +
1587 'a.b------------.' +
1588 '.c.------------.' +
1589 'd.e------------.' +
1590 'k--------------.' +
1591 'l--------------.' +
1592 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001593 ]
1594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001596 # Try a few one-shot test cases.
1597 for input, eof, output in self.test_cases:
1598 d = StatefulIncrementalDecoder()
1599 self.assertEquals(d.decode(input, eof), output)
1600
1601 # Also test an unfinished decode, followed by forcing EOF.
1602 d = StatefulIncrementalDecoder()
1603 self.assertEquals(d.decode(b'oiabcd'), '')
1604 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001605
1606class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001607
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001608 def setUp(self):
1609 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1610 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001611 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001612
Guido van Rossumd0712812007-04-11 16:32:43 +00001613 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001614 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001616 def test_constructor(self):
1617 r = self.BytesIO(b"\xc3\xa9\n\n")
1618 b = self.BufferedReader(r, 1000)
1619 t = self.TextIOWrapper(b)
1620 t.__init__(b, encoding="latin1", newline="\r\n")
1621 self.assertEquals(t.encoding, "latin1")
1622 self.assertEquals(t.line_buffering, False)
1623 t.__init__(b, encoding="utf8", line_buffering=True)
1624 self.assertEquals(t.encoding, "utf8")
1625 self.assertEquals(t.line_buffering, True)
1626 self.assertEquals("\xe9\n", t.readline())
1627 self.assertRaises(TypeError, t.__init__, b, newline=42)
1628 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1629
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001630 def test_detach(self):
1631 r = self.BytesIO()
1632 b = self.BufferedWriter(r)
1633 t = self.TextIOWrapper(b)
1634 self.assertIs(t.detach(), b)
1635
1636 t = self.TextIOWrapper(b, encoding="ascii")
1637 t.write("howdy")
1638 self.assertFalse(r.getvalue())
1639 t.detach()
1640 self.assertEqual(r.getvalue(), b"howdy")
1641 self.assertRaises(ValueError, t.detach)
1642
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001643 def test_repr(self):
1644 raw = self.BytesIO("hello".encode("utf-8"))
1645 b = self.BufferedReader(raw)
1646 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001647 modname = self.TextIOWrapper.__module__
1648 self.assertEqual(repr(t),
1649 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1650 raw.name = "dummy"
1651 self.assertEqual(repr(t),
1652 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1653 raw.name = b"dummy"
1654 self.assertEqual(repr(t),
1655 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 def test_line_buffering(self):
1658 r = self.BytesIO()
1659 b = self.BufferedWriter(r, 1000)
1660 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001661 t.write("X")
1662 self.assertEquals(r.getvalue(), b"") # No flush happened
1663 t.write("Y\nZ")
1664 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1665 t.write("A\rB")
1666 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 def test_encoding(self):
1669 # Check the encoding attribute is always set, and valid
1670 b = self.BytesIO()
1671 t = self.TextIOWrapper(b, encoding="utf8")
1672 self.assertEqual(t.encoding, "utf8")
1673 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001674 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 codecs.lookup(t.encoding)
1676
1677 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001678 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001679 b = self.BytesIO(b"abc\n\xff\n")
1680 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001681 self.assertRaises(UnicodeError, t.read)
1682 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 b = self.BytesIO(b"abc\n\xff\n")
1684 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001685 self.assertRaises(UnicodeError, t.read)
1686 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001687 b = self.BytesIO(b"abc\n\xff\n")
1688 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001689 self.assertEquals(t.read(), "abc\n\n")
1690 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 b = self.BytesIO(b"abc\n\xff\n")
1692 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001693 self.assertEquals(t.read(), "abc\n\ufffd\n")
1694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001696 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 b = self.BytesIO()
1698 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001699 self.assertRaises(UnicodeError, t.write, "\xff")
1700 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 b = self.BytesIO()
1702 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001703 self.assertRaises(UnicodeError, t.write, "\xff")
1704 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 b = self.BytesIO()
1706 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001707 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001708 t.write("abc\xffdef\n")
1709 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001710 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001711 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 b = self.BytesIO()
1713 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001714 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001715 t.write("abc\xffdef\n")
1716 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001717 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001720 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1721
1722 tests = [
1723 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001724 [ '', input_lines ],
1725 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1726 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1727 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001728 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001729 encodings = (
1730 'utf-8', 'latin-1',
1731 'utf-16', 'utf-16-le', 'utf-16-be',
1732 'utf-32', 'utf-32-le', 'utf-32-be',
1733 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001734
Guido van Rossum8358db22007-08-18 21:39:55 +00001735 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001736 # character in TextIOWrapper._pending_line.
1737 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001738 # XXX: str.encode() should return bytes
1739 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001740 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001741 for bufsize in range(1, 10):
1742 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1744 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001745 encoding=encoding)
1746 if do_reads:
1747 got_lines = []
1748 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001749 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001750 if c2 == '':
1751 break
1752 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001753 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001754 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001755 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001756
1757 for got_line, exp_line in zip(got_lines, exp_lines):
1758 self.assertEquals(got_line, exp_line)
1759 self.assertEquals(len(got_lines), len(exp_lines))
1760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 def test_newlines_input(self):
1762 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001763 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1764 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001765 (None, normalized.decode("ascii").splitlines(True)),
1766 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1768 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1769 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001770 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 buf = self.BytesIO(testdata)
1772 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001773 self.assertEquals(txt.readlines(), expected)
1774 txt.seek(0)
1775 self.assertEquals(txt.read(), "".join(expected))
1776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 def test_newlines_output(self):
1778 testdict = {
1779 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1780 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1781 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1782 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1783 }
1784 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1785 for newline, expected in tests:
1786 buf = self.BytesIO()
1787 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1788 txt.write("AAA\nB")
1789 txt.write("BB\nCCC\n")
1790 txt.write("X\rY\r\nZ")
1791 txt.flush()
1792 self.assertEquals(buf.closed, False)
1793 self.assertEquals(buf.getvalue(), expected)
1794
1795 def test_destructor(self):
1796 l = []
1797 base = self.BytesIO
1798 class MyBytesIO(base):
1799 def close(self):
1800 l.append(self.getvalue())
1801 base.close(self)
1802 b = MyBytesIO()
1803 t = self.TextIOWrapper(b, encoding="ascii")
1804 t.write("abc")
1805 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001806 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 self.assertEquals([b"abc"], l)
1808
1809 def test_override_destructor(self):
1810 record = []
1811 class MyTextIO(self.TextIOWrapper):
1812 def __del__(self):
1813 record.append(1)
1814 try:
1815 f = super().__del__
1816 except AttributeError:
1817 pass
1818 else:
1819 f()
1820 def close(self):
1821 record.append(2)
1822 super().close()
1823 def flush(self):
1824 record.append(3)
1825 super().flush()
1826 b = self.BytesIO()
1827 t = MyTextIO(b, encoding="ascii")
1828 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001829 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 self.assertEqual(record, [1, 2, 3])
1831
1832 def test_error_through_destructor(self):
1833 # Test that the exception state is not modified by a destructor,
1834 # even if close() fails.
1835 rawio = self.CloseFailureIO()
1836 def f():
1837 self.TextIOWrapper(rawio).xyzzy
1838 with support.captured_output("stderr") as s:
1839 self.assertRaises(AttributeError, f)
1840 s = s.getvalue().strip()
1841 if s:
1842 # The destructor *may* have printed an unraisable error, check it
1843 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001844 self.assertTrue(s.startswith("Exception IOError: "), s)
1845 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001846
Guido van Rossum9b76da62007-04-11 01:09:03 +00001847 # Systematic tests of the text I/O API
1848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001850 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1851 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001853 f._CHUNK_SIZE = chunksize
1854 self.assertEquals(f.write("abc"), 3)
1855 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001857 f._CHUNK_SIZE = chunksize
1858 self.assertEquals(f.tell(), 0)
1859 self.assertEquals(f.read(), "abc")
1860 cookie = f.tell()
1861 self.assertEquals(f.seek(0), 0)
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001862 self.assertEquals(f.read(None), "abc")
1863 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001864 self.assertEquals(f.read(2), "ab")
1865 self.assertEquals(f.read(1), "c")
1866 self.assertEquals(f.read(1), "")
1867 self.assertEquals(f.read(), "")
1868 self.assertEquals(f.tell(), cookie)
1869 self.assertEquals(f.seek(0), 0)
1870 self.assertEquals(f.seek(0, 2), cookie)
1871 self.assertEquals(f.write("def"), 3)
1872 self.assertEquals(f.seek(cookie), cookie)
1873 self.assertEquals(f.read(), "def")
1874 if enc.startswith("utf"):
1875 self.multi_line_test(f, enc)
1876 f.close()
1877
1878 def multi_line_test(self, f, enc):
1879 f.seek(0)
1880 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001881 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001882 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001883 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001884 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001885 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001886 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001887 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001888 wlines.append((f.tell(), line))
1889 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001890 f.seek(0)
1891 rlines = []
1892 while True:
1893 pos = f.tell()
1894 line = f.readline()
1895 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001896 break
1897 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001898 self.assertEquals(rlines, wlines)
1899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 def test_telling(self):
1901 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001902 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001903 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001904 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001905 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001906 p2 = f.tell()
1907 f.seek(0)
1908 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001909 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001910 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001911 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001912 self.assertEquals(f.tell(), p2)
1913 f.seek(0)
1914 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001915 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001916 self.assertRaises(IOError, f.tell)
1917 self.assertEquals(f.tell(), p2)
1918 f.close()
1919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 def test_seeking(self):
1921 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001922 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001923 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001924 prefix = bytes(u_prefix.encode("utf-8"))
1925 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001926 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001927 suffix = bytes(u_suffix.encode("utf-8"))
1928 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001930 f.write(line*2)
1931 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001933 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001934 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001935 self.assertEquals(f.tell(), prefix_size)
1936 self.assertEquals(f.readline(), u_suffix)
1937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001939 # Regression test for a specific bug
1940 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001942 f.write(data)
1943 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001945 f._CHUNK_SIZE # Just test that it exists
1946 f._CHUNK_SIZE = 2
1947 f.readline()
1948 f.tell()
1949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950 def test_seek_and_tell(self):
1951 #Test seek/tell using the StatefulIncrementalDecoder.
1952 # Make test faster by doing smaller seeks
1953 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001954
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001955 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001956 """Tell/seek to various points within a data stream and ensure
1957 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001959 f.write(data)
1960 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 f = self.open(support.TESTFN, encoding='test_decoder')
1962 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001963 decoded = f.read()
1964 f.close()
1965
Neal Norwitze2b07052008-03-18 19:52:05 +00001966 for i in range(min_pos, len(decoded) + 1): # seek positions
1967 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001968 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001969 self.assertEquals(f.read(i), decoded[:i])
1970 cookie = f.tell()
1971 self.assertEquals(f.read(j), decoded[i:i + j])
1972 f.seek(cookie)
1973 self.assertEquals(f.read(), decoded[i:])
1974 f.close()
1975
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001976 # Enable the test decoder.
1977 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001978
1979 # Run the tests.
1980 try:
1981 # Try each test case.
1982 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001983 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001984
1985 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001986 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1987 offset = CHUNK_SIZE - len(input)//2
1988 prefix = b'.'*offset
1989 # Don't bother seeking into the prefix (takes too long).
1990 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001991 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001992
1993 # Ensure our test decoder won't interfere with subsequent tests.
1994 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001995 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001998 data = "1234567890"
1999 tests = ("utf-16",
2000 "utf-16-le",
2001 "utf-16-be",
2002 "utf-32",
2003 "utf-32-le",
2004 "utf-32-be")
2005 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 buf = self.BytesIO()
2007 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002008 # Check if the BOM is written only once (see issue1753).
2009 f.write(data)
2010 f.write(data)
2011 f.seek(0)
2012 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002013 f.seek(0)
2014 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002015 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2016
Benjamin Petersona1b49012009-03-31 23:11:32 +00002017 def test_unreadable(self):
2018 class UnReadable(self.BytesIO):
2019 def readable(self):
2020 return False
2021 txt = self.TextIOWrapper(UnReadable())
2022 self.assertRaises(IOError, txt.read)
2023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 def test_read_one_by_one(self):
2025 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002026 reads = ""
2027 while True:
2028 c = txt.read(1)
2029 if not c:
2030 break
2031 reads += c
2032 self.assertEquals(reads, "AA\nBB")
2033
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002034 def test_readlines(self):
2035 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2036 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2037 txt.seek(0)
2038 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2039 txt.seek(0)
2040 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2041
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002042 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002046 reads = ""
2047 while True:
2048 c = txt.read(128)
2049 if not c:
2050 break
2051 reads += c
2052 self.assertEquals(reads, "A"*127+"\nB")
2053
2054 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002056
2057 # read one char at a time
2058 reads = ""
2059 while True:
2060 c = txt.read(1)
2061 if not c:
2062 break
2063 reads += c
2064 self.assertEquals(reads, self.normalized)
2065
2066 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068 txt._CHUNK_SIZE = 4
2069
2070 reads = ""
2071 while True:
2072 c = txt.read(4)
2073 if not c:
2074 break
2075 reads += c
2076 self.assertEquals(reads, self.normalized)
2077
2078 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002079 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002080 txt._CHUNK_SIZE = 4
2081
2082 reads = txt.read(4)
2083 reads += txt.read(4)
2084 reads += txt.readline()
2085 reads += txt.readline()
2086 reads += txt.readline()
2087 self.assertEquals(reads, self.normalized)
2088
2089 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002091 txt._CHUNK_SIZE = 4
2092
2093 reads = txt.read(4)
2094 reads += txt.read()
2095 self.assertEquals(reads, self.normalized)
2096
2097 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002099 txt._CHUNK_SIZE = 4
2100
2101 reads = txt.read(4)
2102 pos = txt.tell()
2103 txt.seek(0)
2104 txt.seek(pos)
2105 self.assertEquals(txt.read(4), "BBB\n")
2106
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002107 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 buffer = self.BytesIO(self.testdata)
2109 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002110
2111 self.assertEqual(buffer.seekable(), txt.seekable())
2112
Antoine Pitroue4501852009-05-14 18:55:55 +00002113 def test_append_bom(self):
2114 # The BOM is not written again when appending to a non-empty file
2115 filename = support.TESTFN
2116 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2117 with self.open(filename, 'w', encoding=charset) as f:
2118 f.write('aaa')
2119 pos = f.tell()
2120 with self.open(filename, 'rb') as f:
2121 self.assertEquals(f.read(), 'aaa'.encode(charset))
2122
2123 with self.open(filename, 'a', encoding=charset) as f:
2124 f.write('xxx')
2125 with self.open(filename, 'rb') as f:
2126 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2127
2128 def test_seek_bom(self):
2129 # Same test, but when seeking manually
2130 filename = support.TESTFN
2131 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2132 with self.open(filename, 'w', encoding=charset) as f:
2133 f.write('aaa')
2134 pos = f.tell()
2135 with self.open(filename, 'r+', encoding=charset) as f:
2136 f.seek(pos)
2137 f.write('zzz')
2138 f.seek(0)
2139 f.write('bbb')
2140 with self.open(filename, 'rb') as f:
2141 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2142
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002143 def test_errors_property(self):
2144 with self.open(support.TESTFN, "w") as f:
2145 self.assertEqual(f.errors, "strict")
2146 with self.open(support.TESTFN, "w", errors="replace") as f:
2147 self.assertEqual(f.errors, "replace")
2148
Antoine Pitroue4501852009-05-14 18:55:55 +00002149
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002150 def test_threads_write(self):
2151 # Issue6750: concurrent writes could duplicate data
2152 event = threading.Event()
2153 with self.open(support.TESTFN, "w", buffering=1) as f:
2154 def run(n):
2155 text = "Thread%03d\n" % n
2156 event.wait()
2157 f.write(text)
2158 threads = [threading.Thread(target=lambda n=x: run(n))
2159 for x in range(20)]
2160 for t in threads:
2161 t.start()
2162 time.sleep(0.02)
2163 event.set()
2164 for t in threads:
2165 t.join()
2166 with self.open(support.TESTFN) as f:
2167 content = f.read()
2168 for n in range(20):
2169 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2170
Antoine Pitroufaf90072010-05-03 16:58:19 +00002171 def test_flush_error_on_close(self):
2172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2173 def bad_flush():
2174 raise IOError()
2175 txt.flush = bad_flush
2176 self.assertRaises(IOError, txt.close) # exception not swallowed
2177
2178 def test_multi_close(self):
2179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2180 txt.close()
2181 txt.close()
2182 txt.close()
2183 self.assertRaises(ValueError, txt.flush)
2184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185class CTextIOWrapperTest(TextIOWrapperTest):
2186
2187 def test_initialization(self):
2188 r = self.BytesIO(b"\xc3\xa9\n\n")
2189 b = self.BufferedReader(r, 1000)
2190 t = self.TextIOWrapper(b)
2191 self.assertRaises(TypeError, t.__init__, b, newline=42)
2192 self.assertRaises(ValueError, t.read)
2193 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2194 self.assertRaises(ValueError, t.read)
2195
2196 def test_garbage_collection(self):
2197 # C TextIOWrapper objects are collected, and collecting them flushes
2198 # all data to disk.
2199 # The Python version has __del__, so it ends in gc.garbage instead.
2200 rawio = io.FileIO(support.TESTFN, "wb")
2201 b = self.BufferedWriter(rawio)
2202 t = self.TextIOWrapper(b, encoding="ascii")
2203 t.write("456def")
2204 t.x = t
2205 wr = weakref.ref(t)
2206 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002207 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002208 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002209 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 self.assertEqual(f.read(), b"456def")
2211
2212class PyTextIOWrapperTest(TextIOWrapperTest):
2213 pass
2214
2215
2216class IncrementalNewlineDecoderTest(unittest.TestCase):
2217
2218 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002219 # UTF-8 specific tests for a newline decoder
2220 def _check_decode(b, s, **kwargs):
2221 # We exercise getstate() / setstate() as well as decode()
2222 state = decoder.getstate()
2223 self.assertEquals(decoder.decode(b, **kwargs), s)
2224 decoder.setstate(state)
2225 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002226
Antoine Pitrou180a3362008-12-14 16:36:46 +00002227 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002228
Antoine Pitrou180a3362008-12-14 16:36:46 +00002229 _check_decode(b'\xe8', "")
2230 _check_decode(b'\xa2', "")
2231 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002232
Antoine Pitrou180a3362008-12-14 16:36:46 +00002233 _check_decode(b'\xe8', "")
2234 _check_decode(b'\xa2', "")
2235 _check_decode(b'\x88', "\u8888")
2236
2237 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002238 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2239
Antoine Pitrou180a3362008-12-14 16:36:46 +00002240 decoder.reset()
2241 _check_decode(b'\n', "\n")
2242 _check_decode(b'\r', "")
2243 _check_decode(b'', "\n", final=True)
2244 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002245
Antoine Pitrou180a3362008-12-14 16:36:46 +00002246 _check_decode(b'\r', "")
2247 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002248
Antoine Pitrou180a3362008-12-14 16:36:46 +00002249 _check_decode(b'\r\r\n', "\n\n")
2250 _check_decode(b'\r', "")
2251 _check_decode(b'\r', "\n")
2252 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002253
Antoine Pitrou180a3362008-12-14 16:36:46 +00002254 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2255 _check_decode(b'\xe8\xa2\x88', "\u8888")
2256 _check_decode(b'\n', "\n")
2257 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2258 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002261 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 if encoding is not None:
2263 encoder = codecs.getincrementalencoder(encoding)()
2264 def _decode_bytewise(s):
2265 # Decode one byte at a time
2266 for b in encoder.encode(s):
2267 result.append(decoder.decode(bytes([b])))
2268 else:
2269 encoder = None
2270 def _decode_bytewise(s):
2271 # Decode one char at a time
2272 for c in s:
2273 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002274 self.assertEquals(decoder.newlines, None)
2275 _decode_bytewise("abc\n\r")
2276 self.assertEquals(decoder.newlines, '\n')
2277 _decode_bytewise("\nabc")
2278 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2279 _decode_bytewise("abc\r")
2280 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2281 _decode_bytewise("abc")
2282 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2283 _decode_bytewise("abc\r")
2284 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2285 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 input = "abc"
2287 if encoder is not None:
2288 encoder.reset()
2289 input = encoder.encode(input)
2290 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002291 self.assertEquals(decoder.newlines, None)
2292
2293 def test_newline_decoder(self):
2294 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 # None meaning the IncrementalNewlineDecoder takes unicode input
2296 # rather than bytes input
2297 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002298 'utf-16', 'utf-16-le', 'utf-16-be',
2299 'utf-32', 'utf-32-le', 'utf-32-be',
2300 )
2301 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002302 decoder = enc and codecs.getincrementaldecoder(enc)()
2303 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2304 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002305 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002306 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2307 self.check_newline_decoding_utf8(decoder)
2308
Antoine Pitrou66913e22009-03-06 23:40:56 +00002309 def test_newline_bytes(self):
2310 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2311 def _check(dec):
2312 self.assertEquals(dec.newlines, None)
2313 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2314 self.assertEquals(dec.newlines, None)
2315 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2316 self.assertEquals(dec.newlines, None)
2317 dec = self.IncrementalNewlineDecoder(None, translate=False)
2318 _check(dec)
2319 dec = self.IncrementalNewlineDecoder(None, translate=True)
2320 _check(dec)
2321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2323 pass
2324
2325class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2326 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002327
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002328
Guido van Rossum01a27522007-03-07 01:00:12 +00002329# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002330
Guido van Rossum5abbf752007-08-27 17:39:33 +00002331class MiscIOTest(unittest.TestCase):
2332
Barry Warsaw40e82462008-11-20 20:14:50 +00002333 def tearDown(self):
2334 support.unlink(support.TESTFN)
2335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 def test___all__(self):
2337 for name in self.io.__all__:
2338 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002339 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002340 if name == "open":
2341 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002342 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002343 self.assertTrue(issubclass(obj, Exception), name)
2344 elif not name.startswith("SEEK_"):
2345 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002346
Barry Warsaw40e82462008-11-20 20:14:50 +00002347 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002349 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002350 f.close()
2351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002352 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002353 self.assertEquals(f.name, support.TESTFN)
2354 self.assertEquals(f.buffer.name, support.TESTFN)
2355 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2356 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002357 self.assertEquals(f.buffer.mode, "rb")
2358 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002359 f.close()
2360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002362 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002363 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2364 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002365
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002366 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002367 self.assertEquals(g.mode, "wb")
2368 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002369 self.assertEquals(g.name, f.fileno())
2370 self.assertEquals(g.raw.name, f.fileno())
2371 f.close()
2372 g.close()
2373
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002374 def test_io_after_close(self):
2375 for kwargs in [
2376 {"mode": "w"},
2377 {"mode": "wb"},
2378 {"mode": "w", "buffering": 1},
2379 {"mode": "w", "buffering": 2},
2380 {"mode": "wb", "buffering": 0},
2381 {"mode": "r"},
2382 {"mode": "rb"},
2383 {"mode": "r", "buffering": 1},
2384 {"mode": "r", "buffering": 2},
2385 {"mode": "rb", "buffering": 0},
2386 {"mode": "w+"},
2387 {"mode": "w+b"},
2388 {"mode": "w+", "buffering": 1},
2389 {"mode": "w+", "buffering": 2},
2390 {"mode": "w+b", "buffering": 0},
2391 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002393 f.close()
2394 self.assertRaises(ValueError, f.flush)
2395 self.assertRaises(ValueError, f.fileno)
2396 self.assertRaises(ValueError, f.isatty)
2397 self.assertRaises(ValueError, f.__iter__)
2398 if hasattr(f, "peek"):
2399 self.assertRaises(ValueError, f.peek, 1)
2400 self.assertRaises(ValueError, f.read)
2401 if hasattr(f, "read1"):
2402 self.assertRaises(ValueError, f.read1, 1024)
2403 if hasattr(f, "readinto"):
2404 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2405 self.assertRaises(ValueError, f.readline)
2406 self.assertRaises(ValueError, f.readlines)
2407 self.assertRaises(ValueError, f.seek, 0)
2408 self.assertRaises(ValueError, f.tell)
2409 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002410 self.assertRaises(ValueError, f.write,
2411 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002412 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 def test_blockingioerror(self):
2416 # Various BlockingIOError issues
2417 self.assertRaises(TypeError, self.BlockingIOError)
2418 self.assertRaises(TypeError, self.BlockingIOError, 1)
2419 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2420 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2421 b = self.BlockingIOError(1, "")
2422 self.assertEqual(b.characters_written, 0)
2423 class C(str):
2424 pass
2425 c = C("")
2426 b = self.BlockingIOError(1, c)
2427 c.b = b
2428 b.c = c
2429 wr = weakref.ref(c)
2430 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002431 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002432 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433
2434 def test_abcs(self):
2435 # Test the visible base classes are ABCs.
2436 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2437 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2438 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2439 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2440
2441 def _check_abc_inheritance(self, abcmodule):
2442 with self.open(support.TESTFN, "wb", buffering=0) as f:
2443 self.assertTrue(isinstance(f, abcmodule.IOBase))
2444 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2445 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2446 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2447 with self.open(support.TESTFN, "wb") as f:
2448 self.assertTrue(isinstance(f, abcmodule.IOBase))
2449 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2450 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2451 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2452 with self.open(support.TESTFN, "w") as f:
2453 self.assertTrue(isinstance(f, abcmodule.IOBase))
2454 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2455 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2456 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2457
2458 def test_abc_inheritance(self):
2459 # Test implementations inherit from their respective ABCs
2460 self._check_abc_inheritance(self)
2461
2462 def test_abc_inheritance_official(self):
2463 # Test implementations inherit from the official ABCs of the
2464 # baseline "io" module.
2465 self._check_abc_inheritance(io)
2466
2467class CMiscIOTest(MiscIOTest):
2468 io = io
2469
2470class PyMiscIOTest(MiscIOTest):
2471 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002472
Guido van Rossum28524c72007-02-27 05:47:44 +00002473def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 tests = (CIOTest, PyIOTest,
2475 CBufferedReaderTest, PyBufferedReaderTest,
2476 CBufferedWriterTest, PyBufferedWriterTest,
2477 CBufferedRWPairTest, PyBufferedRWPairTest,
2478 CBufferedRandomTest, PyBufferedRandomTest,
2479 StatefulIncrementalDecoderTest,
2480 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2481 CTextIOWrapperTest, PyTextIOWrapperTest,
2482 CMiscIOTest, PyMiscIOTest,)
2483
2484 # Put the namespaces of the IO module we are testing and some useful mock
2485 # classes in the __dict__ of each test.
2486 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2487 MockNonBlockWriterIO)
2488 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2489 c_io_ns = {name : getattr(io, name) for name in all_members}
2490 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2491 globs = globals()
2492 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2493 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2494 # Avoid turning open into a bound method.
2495 py_io_ns["open"] = pyio.OpenWrapper
2496 for test in tests:
2497 if test.__name__.startswith("C"):
2498 for name, obj in c_io_ns.items():
2499 setattr(test, name, obj)
2500 elif test.__name__.startswith("Py"):
2501 for name, obj in py_io_ns.items():
2502 setattr(test, name, obj)
2503
2504 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002505
2506if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 test_main()