blob: 3a3e074ba62ebfdc597f4531334d8a5bd487a0ef [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
50class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000052 def __init__(self, read_stack=()):
53 self._read_stack = list(read_stack)
54 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000056 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057
58 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000059 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062 except:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000063 self._extraneous_reads += 1
Guido van Rossum78892e42007-04-06 17:31:18 +000064 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
114
115class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000117
Guido van Rossuma9e20242007-03-08 00:43:48 +0000118
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000119class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return super().write(b) * 2
122
123 def read(self, n=None):
124 return super().read(n) * 2
125
126 def seek(self, pos, whence):
127 return -123
128
129 def tell(self):
130 return -456
131
132 def readinto(self, buf):
133 super().readinto(buf)
134 return len(buf) * 5
135
136class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
138
139class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
141
142
143class CloseFailureIO(MockRawIO):
144 closed = 0
145
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
150
151class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
153
154class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
156
157
158class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000159
160 def __init__(self, data):
161 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000162 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000163
164 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000166 self.read_history.append(None if res is None else len(res))
167 return res
168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169 def readinto(self, b):
170 res = super().readinto(b)
171 self.read_history.append(res)
172 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000174class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
179
180
181class MockNonBlockWriterIO:
182
183 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000184 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
191
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
195
196 def readable(self):
197 return True
198
199 def seekable(self):
200 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000201
Guido van Rossum01a27522007-03-07 01:00:12 +0000202 def writable(self):
203 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
219
220class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
222
223class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
225
Guido van Rossuma9e20242007-03-08 00:43:48 +0000226
Guido van Rossum28524c72007-02-27 05:47:44 +0000227class IOTest(unittest.TestCase):
228
Neal Norwitze7789b12008-03-24 06:18:09 +0000229 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000230 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000231
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000232 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000233 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000234
Guido van Rossum28524c72007-02-27 05:47:44 +0000235 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000236 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000237 f.truncate(0)
238 self.assertEqual(f.tell(), 5)
239 f.seek(0)
240
241 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000242 self.assertEqual(f.seek(0), 0)
243 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000244 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000245 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000247 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000248 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000249 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000250 self.assertEqual(f.seek(-1, 2), 13)
251 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000252
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000254 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000255 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000256
Guido van Rossum9b76da62007-04-11 01:09:03 +0000257 def read_ops(self, f, buffered=False):
258 data = f.read(5)
259 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000260 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000261 self.assertEqual(f.readinto(data), 5)
262 self.assertEqual(data, b" worl")
263 self.assertEqual(f.readinto(data), 2)
264 self.assertEqual(len(data), 5)
265 self.assertEqual(data[:2], b"d\n")
266 self.assertEqual(f.seek(0), 0)
267 self.assertEqual(f.read(20), b"hello world\n")
268 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000269 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000270 self.assertEqual(f.seek(-6, 2), 6)
271 self.assertEqual(f.read(5), b"world")
272 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000273 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000274 self.assertEqual(f.seek(-6, 1), 5)
275 self.assertEqual(f.read(5), b" worl")
276 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000277 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000278 if buffered:
279 f.seek(0)
280 self.assertEqual(f.read(), b"hello world\n")
281 f.seek(6)
282 self.assertEqual(f.read(), b"world\n")
283 self.assertEqual(f.read(), b"")
284
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 LARGE = 2**31
286
Guido van Rossum53807da2007-04-10 19:01:47 +0000287 def large_file_ops(self, f):
288 assert f.readable()
289 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000290 self.assertEqual(f.seek(self.LARGE), self.LARGE)
291 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000292 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000293 self.assertEqual(f.tell(), self.LARGE + 3)
294 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000295 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 self.assertEqual(f.tell(), self.LARGE + 2)
297 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000299 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000300 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
301 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000302 self.assertEqual(f.read(2), b"x")
303
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000304 def test_invalid_operations(self):
305 # Try writing on a file opened in read mode and vice-versa.
306 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000307 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000308 self.assertRaises(IOError, fp.read)
309 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000310 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000311 self.assertRaises(IOError, fp.write, b"blah")
312 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000313 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000314 self.assertRaises(IOError, fp.write, "blah")
315 self.assertRaises(IOError, fp.writelines, ["blah\n"])
316
Guido van Rossum28524c72007-02-27 05:47:44 +0000317 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000318 with self.open(support.TESTFN, "wb", buffering=0) as f:
319 self.assertEqual(f.readable(), False)
320 self.assertEqual(f.writable(), True)
321 self.assertEqual(f.seekable(), True)
322 self.write_ops(f)
323 with self.open(support.TESTFN, "rb", buffering=0) as f:
324 self.assertEqual(f.readable(), True)
325 self.assertEqual(f.writable(), False)
326 self.assertEqual(f.seekable(), True)
327 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000328
Guido van Rossum87429772007-04-10 21:06:59 +0000329 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000330 with self.open(support.TESTFN, "wb") as f:
331 self.assertEqual(f.readable(), False)
332 self.assertEqual(f.writable(), True)
333 self.assertEqual(f.seekable(), True)
334 self.write_ops(f)
335 with self.open(support.TESTFN, "rb") as f:
336 self.assertEqual(f.readable(), True)
337 self.assertEqual(f.writable(), False)
338 self.assertEqual(f.seekable(), True)
339 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000340
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000341 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000342 with self.open(support.TESTFN, "wb") as f:
343 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
344 with self.open(support.TESTFN, "rb") as f:
345 self.assertEqual(f.readline(), b"abc\n")
346 self.assertEqual(f.readline(10), b"def\n")
347 self.assertEqual(f.readline(2), b"xy")
348 self.assertEqual(f.readline(4), b"zzy\n")
349 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000350 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000351 self.assertRaises(TypeError, f.readline, 5.3)
352 with self.open(support.TESTFN, "r") as f:
353 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000354
Guido van Rossum28524c72007-02-27 05:47:44 +0000355 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000356 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000357 self.write_ops(f)
358 data = f.getvalue()
359 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000360 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000361 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000362
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000364 # On Windows and Mac OSX this test comsumes large resources; It takes
365 # a long time to build the >2GB file and takes >2GB of disk space
366 # therefore the resource must be enabled to run this test.
367 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000369 print("\nTesting large file ops skipped on %s." % sys.platform,
370 file=sys.stderr)
371 print("It requires %d bytes and a long time." % self.LARGE,
372 file=sys.stderr)
373 print("Use 'regrtest.py -u largefile test_io' to run it.",
374 file=sys.stderr)
375 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "w+b", 0) as f:
377 self.large_file_ops(f)
378 with self.open(support.TESTFN, "w+b") as f:
379 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000380
381 def test_with_open(self):
382 for bufsize in (0, 1, 100):
383 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000384 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000385 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000386 self.assertEqual(f.closed, True)
387 f = None
388 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000389 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000390 1/0
391 except ZeroDivisionError:
392 self.assertEqual(f.closed, True)
393 else:
394 self.fail("1/0 didn't raise an exception")
395
Antoine Pitrou08838b62009-01-21 00:55:13 +0000396 # issue 5008
397 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000399 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000401 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000403 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000405 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000406
Guido van Rossum87429772007-04-10 21:06:59 +0000407 def test_destructor(self):
408 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000410 def __del__(self):
411 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 try:
413 f = super().__del__
414 except AttributeError:
415 pass
416 else:
417 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def close(self):
419 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def flush(self):
422 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 super().flush()
424 f = MyFileIO(support.TESTFN, "wb")
425 f.write(b"xxx")
426 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000427 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000428 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000429 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000430 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431
432 def _check_base_destructor(self, base):
433 record = []
434 class MyIO(base):
435 def __init__(self):
436 # This exercises the availability of attributes on object
437 # destruction.
438 # (in the C version, close() is called by the tp_dealloc
439 # function, not by __del__)
440 self.on_del = 1
441 self.on_close = 2
442 self.on_flush = 3
443 def __del__(self):
444 record.append(self.on_del)
445 try:
446 f = super().__del__
447 except AttributeError:
448 pass
449 else:
450 f()
451 def close(self):
452 record.append(self.on_close)
453 super().close()
454 def flush(self):
455 record.append(self.on_flush)
456 super().flush()
457 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000458 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000459 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000460 self.assertEqual(record, [1, 2, 3])
461
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 def test_IOBase_destructor(self):
463 self._check_base_destructor(self.IOBase)
464
465 def test_RawIOBase_destructor(self):
466 self._check_base_destructor(self.RawIOBase)
467
468 def test_BufferedIOBase_destructor(self):
469 self._check_base_destructor(self.BufferedIOBase)
470
471 def test_TextIOBase_destructor(self):
472 self._check_base_destructor(self.TextIOBase)
473
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000475 with self.open(support.TESTFN, "wb") as f:
476 f.write(b"xxx")
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000479
Guido van Rossumd4103952007-04-12 05:44:49 +0000480 def test_array_writes(self):
481 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000482 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000483 with self.open(support.TESTFN, "wb", 0) as f:
484 self.assertEqual(f.write(a), n)
485 with self.open(support.TESTFN, "wb") as f:
486 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000487
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000488 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000489 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000490 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000492 def test_read_closed(self):
493 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000494 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000495 with self.open(support.TESTFN, "r") as f:
496 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000497 self.assertEqual(file.read(), "egg\n")
498 file.seek(0)
499 file.close()
500 self.assertRaises(ValueError, file.read)
501
502 def test_no_closefd_with_filename(self):
503 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000505
506 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000509 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000510 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000511 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000512 self.assertEqual(file.buffer.raw.closefd, False)
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_garbage_collection(self):
515 # FileIO objects are collected, and collecting them flushes
516 # all data to disk.
517 f = self.FileIO(support.TESTFN, "wb")
518 f.write(b"abcxxx")
519 f.f = f
520 wr = weakref.ref(f)
521 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000522 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000523 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000524 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000526
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000527 def test_unbounded_file(self):
528 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
529 zero = "/dev/zero"
530 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000531 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000532 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000533 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000534 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000535 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000536 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000537 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000538 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000539 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000540 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000541 self.assertRaises(OverflowError, f.read)
542
Antoine Pitroufaf90072010-05-03 16:58:19 +0000543 def test_flush_error_on_close(self):
544 f = self.open(support.TESTFN, "wb", buffering=0)
545 def bad_flush():
546 raise IOError()
547 f.flush = bad_flush
548 self.assertRaises(IOError, f.close) # exception not swallowed
549
550 def test_multi_close(self):
551 f = self.open(support.TESTFN, "wb", buffering=0)
552 f.close()
553 f.close()
554 f.close()
555 self.assertRaises(ValueError, f.flush)
556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557class CIOTest(IOTest):
558 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000559
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560class PyIOTest(IOTest):
561 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000562
Guido van Rossuma9e20242007-03-08 00:43:48 +0000563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564class CommonBufferedTests:
565 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
566
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000567 def test_detach(self):
568 raw = self.MockRawIO()
569 buf = self.tp(raw)
570 self.assertIs(buf.detach(), raw)
571 self.assertRaises(ValueError, buf.detach)
572
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000573 def test_fileno(self):
574 rawio = self.MockRawIO()
575 bufio = self.tp(rawio)
576
577 self.assertEquals(42, bufio.fileno())
578
579 def test_no_fileno(self):
580 # XXX will we always have fileno() function? If so, kill
581 # this test. Else, write it.
582 pass
583
584 def test_invalid_args(self):
585 rawio = self.MockRawIO()
586 bufio = self.tp(rawio)
587 # Invalid whence
588 self.assertRaises(ValueError, bufio.seek, 0, -1)
589 self.assertRaises(ValueError, bufio.seek, 0, 3)
590
591 def test_override_destructor(self):
592 tp = self.tp
593 record = []
594 class MyBufferedIO(tp):
595 def __del__(self):
596 record.append(1)
597 try:
598 f = super().__del__
599 except AttributeError:
600 pass
601 else:
602 f()
603 def close(self):
604 record.append(2)
605 super().close()
606 def flush(self):
607 record.append(3)
608 super().flush()
609 rawio = self.MockRawIO()
610 bufio = MyBufferedIO(rawio)
611 writable = bufio.writable()
612 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000613 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 if writable:
615 self.assertEqual(record, [1, 2, 3])
616 else:
617 self.assertEqual(record, [1, 2])
618
619 def test_context_manager(self):
620 # Test usability as a context manager
621 rawio = self.MockRawIO()
622 bufio = self.tp(rawio)
623 def _with():
624 with bufio:
625 pass
626 _with()
627 # bufio should now be closed, and using it a second time should raise
628 # a ValueError.
629 self.assertRaises(ValueError, _with)
630
631 def test_error_through_destructor(self):
632 # Test that the exception state is not modified by a destructor,
633 # even if close() fails.
634 rawio = self.CloseFailureIO()
635 def f():
636 self.tp(rawio).xyzzy
637 with support.captured_output("stderr") as s:
638 self.assertRaises(AttributeError, f)
639 s = s.getvalue().strip()
640 if s:
641 # The destructor *may* have printed an unraisable error, check it
642 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000643 self.assertTrue(s.startswith("Exception IOError: "), s)
644 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000645
Antoine Pitrou716c4442009-05-23 19:04:03 +0000646 def test_repr(self):
647 raw = self.MockRawIO()
648 b = self.tp(raw)
649 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
650 self.assertEqual(repr(b), "<%s>" % clsname)
651 raw.name = "dummy"
652 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
653 raw.name = b"dummy"
654 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
655
Antoine Pitroufaf90072010-05-03 16:58:19 +0000656 def test_flush_error_on_close(self):
657 raw = self.MockRawIO()
658 def bad_flush():
659 raise IOError()
660 raw.flush = bad_flush
661 b = self.tp(raw)
662 self.assertRaises(IOError, b.close) # exception not swallowed
663
664 def test_multi_close(self):
665 raw = self.MockRawIO()
666 b = self.tp(raw)
667 b.close()
668 b.close()
669 b.close()
670 self.assertRaises(ValueError, b.flush)
671
Guido van Rossum78892e42007-04-06 17:31:18 +0000672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000673class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
674 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 def test_constructor(self):
677 rawio = self.MockRawIO([b"abc"])
678 bufio = self.tp(rawio)
679 bufio.__init__(rawio)
680 bufio.__init__(rawio, buffer_size=1024)
681 bufio.__init__(rawio, buffer_size=16)
682 self.assertEquals(b"abc", bufio.read())
683 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
684 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
685 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
686 rawio = self.MockRawIO([b"abc"])
687 bufio.__init__(rawio)
688 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000691 for arg in (None, 7):
692 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
693 bufio = self.tp(rawio)
694 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 # Invalid args
696 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 def test_read1(self):
699 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
700 bufio = self.tp(rawio)
701 self.assertEquals(b"a", bufio.read(1))
702 self.assertEquals(b"b", bufio.read1(1))
703 self.assertEquals(rawio._reads, 1)
704 self.assertEquals(b"c", bufio.read1(100))
705 self.assertEquals(rawio._reads, 1)
706 self.assertEquals(b"d", bufio.read1(100))
707 self.assertEquals(rawio._reads, 2)
708 self.assertEquals(b"efg", bufio.read1(100))
709 self.assertEquals(rawio._reads, 3)
710 self.assertEquals(b"", bufio.read1(100))
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000711 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 # Invalid args
713 self.assertRaises(ValueError, bufio.read1, -1)
714
715 def test_readinto(self):
716 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
717 bufio = self.tp(rawio)
718 b = bytearray(2)
719 self.assertEquals(bufio.readinto(b), 2)
720 self.assertEquals(b, b"ab")
721 self.assertEquals(bufio.readinto(b), 2)
722 self.assertEquals(b, b"cd")
723 self.assertEquals(bufio.readinto(b), 2)
724 self.assertEquals(b, b"ef")
725 self.assertEquals(bufio.readinto(b), 1)
726 self.assertEquals(b, b"gf")
727 self.assertEquals(bufio.readinto(b), 0)
728 self.assertEquals(b, b"gf")
729
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000730 def test_readlines(self):
731 def bufio():
732 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
733 return self.tp(rawio)
734 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
735 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
736 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000738 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000739 data = b"abcdefghi"
740 dlen = len(data)
741
742 tests = [
743 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
744 [ 100, [ 3, 3, 3], [ dlen ] ],
745 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
746 ]
747
748 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 rawio = self.MockFileIO(data)
750 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000751 pos = 0
752 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000753 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000754 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000756 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000759 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000760 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
761 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000762
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000763 self.assertEquals(b"abcd", bufio.read(6))
764 self.assertEquals(b"e", bufio.read(1))
765 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 self.assertEquals(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000767 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000768 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000769
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000770 def test_read_past_eof(self):
771 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
772 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000773
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000774 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000775
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000776 def test_read_all(self):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000779
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000780 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000783 try:
784 # Write out many bytes with exactly the same number of 0's,
785 # 1's... 255's. This will help us check that concurrent reading
786 # doesn't duplicate or forget contents.
787 N = 1000
788 l = list(range(256)) * N
789 random.shuffle(l)
790 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000791 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000792 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000793 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000795 errors = []
796 results = []
797 def f():
798 try:
799 # Intra-buffer read then buffer-flushing read
800 for n in cycle([1, 19]):
801 s = bufio.read(n)
802 if not s:
803 break
804 # list.append() is atomic
805 results.append(s)
806 except Exception as e:
807 errors.append(e)
808 raise
809 threads = [threading.Thread(target=f) for x in range(20)]
810 for t in threads:
811 t.start()
812 time.sleep(0.02) # yield
813 for t in threads:
814 t.join()
815 self.assertFalse(errors,
816 "the following exceptions were caught: %r" % errors)
817 s = b''.join(results)
818 for i in range(256):
819 c = bytes(bytearray([i]))
820 self.assertEqual(s.count(c), N)
821 finally:
822 support.unlink(support.TESTFN)
823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000824 def test_misbehaved_io(self):
825 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 self.assertRaises(IOError, bufio.seek, 0)
828 self.assertRaises(IOError, bufio.tell)
829
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000830 def test_no_extraneous_read(self):
831 # Issue #9550; when the raw IO object has satisfied the read request,
832 # we should not issue any additional reads, otherwise it may block
833 # (e.g. socket).
834 bufsize = 16
835 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
836 rawio = self.MockRawIO([b"x" * n])
837 bufio = self.tp(rawio, bufsize)
838 self.assertEqual(bufio.read(n), b"x" * n)
839 # Simple case: one raw read is enough to satisfy the request.
840 self.assertEqual(rawio._extraneous_reads, 0,
841 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
842 # A more complex case where two raw reads are needed to satisfy
843 # the request.
844 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
845 bufio = self.tp(rawio, bufsize)
846 self.assertEqual(bufio.read(n), b"x" * n)
847 self.assertEqual(rawio._extraneous_reads, 0,
848 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
849
850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851class CBufferedReaderTest(BufferedReaderTest):
852 tp = io.BufferedReader
853
854 def test_constructor(self):
855 BufferedReaderTest.test_constructor(self)
856 # The allocation can succeed on 32-bit builds, e.g. with more
857 # than 2GB RAM and a 64-bit kernel.
858 if sys.maxsize > 0x7FFFFFFF:
859 rawio = self.MockRawIO()
860 bufio = self.tp(rawio)
861 self.assertRaises((OverflowError, MemoryError, ValueError),
862 bufio.__init__, rawio, sys.maxsize)
863
864 def test_initialization(self):
865 rawio = self.MockRawIO([b"abc"])
866 bufio = self.tp(rawio)
867 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
868 self.assertRaises(ValueError, bufio.read)
869 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
870 self.assertRaises(ValueError, bufio.read)
871 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
872 self.assertRaises(ValueError, bufio.read)
873
874 def test_misbehaved_io_read(self):
875 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
876 bufio = self.tp(rawio)
877 # _pyio.BufferedReader seems to implement reading different, so that
878 # checking this is not so easy.
879 self.assertRaises(IOError, bufio.read, 10)
880
881 def test_garbage_collection(self):
882 # C BufferedReader objects are collected.
883 # The Python version has __del__, so it ends into gc.garbage instead
884 rawio = self.FileIO(support.TESTFN, "w+b")
885 f = self.tp(rawio)
886 f.f = f
887 wr = weakref.ref(f)
888 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000889 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000890 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000891
892class PyBufferedReaderTest(BufferedReaderTest):
893 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000894
Guido van Rossuma9e20242007-03-08 00:43:48 +0000895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
897 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_constructor(self):
900 rawio = self.MockRawIO()
901 bufio = self.tp(rawio)
902 bufio.__init__(rawio)
903 bufio.__init__(rawio, buffer_size=1024)
904 bufio.__init__(rawio, buffer_size=16)
905 self.assertEquals(3, bufio.write(b"abc"))
906 bufio.flush()
907 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
909 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
910 bufio.__init__(rawio)
911 self.assertEquals(3, bufio.write(b"ghi"))
912 bufio.flush()
913 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
914
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000915 def test_detach_flush(self):
916 raw = self.MockRawIO()
917 buf = self.tp(raw)
918 buf.write(b"howdy!")
919 self.assertFalse(raw._write_stack)
920 buf.detach()
921 self.assertEqual(raw._write_stack, [b"howdy!"])
922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000924 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 writer = self.MockRawIO()
926 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000927 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000928 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 def test_write_overflow(self):
931 writer = self.MockRawIO()
932 bufio = self.tp(writer, 8)
933 contents = b"abcdefghijklmnop"
934 for n in range(0, len(contents), 3):
935 bufio.write(contents[n:n+3])
936 flushed = b"".join(writer._write_stack)
937 # At least (total - 8) bytes were implicitly flushed, perhaps more
938 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000939 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941 def check_writes(self, intermediate_func):
942 # Lots of writes, test the flushed output is as expected.
943 contents = bytes(range(256)) * 1000
944 n = 0
945 writer = self.MockRawIO()
946 bufio = self.tp(writer, 13)
947 # Generator of write sizes: repeat each N 15 times then proceed to N+1
948 def gen_sizes():
949 for size in count(1):
950 for i in range(15):
951 yield size
952 sizes = gen_sizes()
953 while n < len(contents):
954 size = min(next(sizes), len(contents) - n)
955 self.assertEquals(bufio.write(contents[n:n+size]), size)
956 intermediate_func(bufio)
957 n += size
958 bufio.flush()
959 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_writes(self):
962 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 def test_writes_and_flushes(self):
965 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 def test_writes_and_seeks(self):
968 def _seekabs(bufio):
969 pos = bufio.tell()
970 bufio.seek(pos + 1, 0)
971 bufio.seek(pos - 1, 0)
972 bufio.seek(pos, 0)
973 self.check_writes(_seekabs)
974 def _seekrel(bufio):
975 pos = bufio.seek(0, 1)
976 bufio.seek(+1, 1)
977 bufio.seek(-1, 1)
978 bufio.seek(pos, 0)
979 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 def test_writes_and_truncates(self):
982 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 def test_write_non_blocking(self):
985 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000986 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 self.assertEquals(bufio.write(b"abcd"), 4)
989 self.assertEquals(bufio.write(b"efghi"), 5)
990 # 1 byte will be written, the rest will be buffered
991 raw.block_on(b"k")
992 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994 # 8 bytes will be written, 8 will be buffered and the rest will be lost
995 raw.block_on(b"0")
996 try:
997 bufio.write(b"opqrwxyz0123456789")
998 except self.BlockingIOError as e:
999 written = e.characters_written
1000 else:
1001 self.fail("BlockingIOError should have been raised")
1002 self.assertEquals(written, 16)
1003 self.assertEquals(raw.pop_written(),
1004 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1007 s = raw.pop_written()
1008 # Previously buffered bytes were flushed
1009 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 def test_write_and_rewind(self):
1012 raw = io.BytesIO()
1013 bufio = self.tp(raw, 4)
1014 self.assertEqual(bufio.write(b"abcdef"), 6)
1015 self.assertEqual(bufio.tell(), 6)
1016 bufio.seek(0, 0)
1017 self.assertEqual(bufio.write(b"XY"), 2)
1018 bufio.seek(6, 0)
1019 self.assertEqual(raw.getvalue(), b"XYcdef")
1020 self.assertEqual(bufio.write(b"123456"), 6)
1021 bufio.flush()
1022 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_flush(self):
1025 writer = self.MockRawIO()
1026 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001027 bufio.write(b"abc")
1028 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001029 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_destructor(self):
1032 writer = self.MockRawIO()
1033 bufio = self.tp(writer, 8)
1034 bufio.write(b"abc")
1035 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001036 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 self.assertEquals(b"abc", writer._write_stack[0])
1038
1039 def test_truncate(self):
1040 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001041 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 bufio = self.tp(raw, 8)
1043 bufio.write(b"abcdef")
1044 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001045 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001046 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 self.assertEqual(f.read(), b"abc")
1048
1049 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001050 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001051 # Write out many bytes from many threads and test they were
1052 # all flushed.
1053 N = 1000
1054 contents = bytes(range(256)) * N
1055 sizes = cycle([1, 19])
1056 n = 0
1057 queue = deque()
1058 while n < len(contents):
1059 size = next(sizes)
1060 queue.append(contents[n:n+size])
1061 n += size
1062 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001063 # We use a real file object because it allows us to
1064 # exercise situations where the GIL is released before
1065 # writing the buffer to the raw streams. This is in addition
1066 # to concurrency issues due to switching threads in the middle
1067 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001068 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001070 errors = []
1071 def f():
1072 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 while True:
1074 try:
1075 s = queue.popleft()
1076 except IndexError:
1077 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001078 bufio.write(s)
1079 except Exception as e:
1080 errors.append(e)
1081 raise
1082 threads = [threading.Thread(target=f) for x in range(20)]
1083 for t in threads:
1084 t.start()
1085 time.sleep(0.02) # yield
1086 for t in threads:
1087 t.join()
1088 self.assertFalse(errors,
1089 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001091 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 s = f.read()
1093 for i in range(256):
1094 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001095 finally:
1096 support.unlink(support.TESTFN)
1097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_misbehaved_io(self):
1099 rawio = self.MisbehavedRawIO()
1100 bufio = self.tp(rawio, 5)
1101 self.assertRaises(IOError, bufio.seek, 0)
1102 self.assertRaises(IOError, bufio.tell)
1103 self.assertRaises(IOError, bufio.write, b"abcdef")
1104
Benjamin Peterson59406a92009-03-26 17:10:29 +00001105 def test_max_buffer_size_deprecation(self):
1106 with support.check_warnings() as w:
1107 warnings.simplefilter("always", DeprecationWarning)
1108 self.tp(self.MockRawIO(), 8, 12)
1109 self.assertEqual(len(w.warnings), 1)
1110 warning = w.warnings[0]
1111 self.assertTrue(warning.category is DeprecationWarning)
1112 self.assertEqual(str(warning.message),
1113 "max_buffer_size is deprecated")
1114
1115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116class CBufferedWriterTest(BufferedWriterTest):
1117 tp = io.BufferedWriter
1118
1119 def test_constructor(self):
1120 BufferedWriterTest.test_constructor(self)
1121 # The allocation can succeed on 32-bit builds, e.g. with more
1122 # than 2GB RAM and a 64-bit kernel.
1123 if sys.maxsize > 0x7FFFFFFF:
1124 rawio = self.MockRawIO()
1125 bufio = self.tp(rawio)
1126 self.assertRaises((OverflowError, MemoryError, ValueError),
1127 bufio.__init__, rawio, sys.maxsize)
1128
1129 def test_initialization(self):
1130 rawio = self.MockRawIO()
1131 bufio = self.tp(rawio)
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1133 self.assertRaises(ValueError, bufio.write, b"def")
1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1135 self.assertRaises(ValueError, bufio.write, b"def")
1136 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1137 self.assertRaises(ValueError, bufio.write, b"def")
1138
1139 def test_garbage_collection(self):
1140 # C BufferedWriter objects are collected, and collecting them flushes
1141 # all data to disk.
1142 # The Python version has __del__, so it ends into gc.garbage instead
1143 rawio = self.FileIO(support.TESTFN, "w+b")
1144 f = self.tp(rawio)
1145 f.write(b"123xxx")
1146 f.x = f
1147 wr = weakref.ref(f)
1148 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001149 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001150 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001151 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152 self.assertEqual(f.read(), b"123xxx")
1153
1154
1155class PyBufferedWriterTest(BufferedWriterTest):
1156 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001157
Guido van Rossum01a27522007-03-07 01:00:12 +00001158class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001159
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001160 def test_constructor(self):
1161 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001162 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001163
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001164 def test_detach(self):
1165 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1166 self.assertRaises(self.UnsupportedOperation, pair.detach)
1167
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001168 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001169 with support.check_warnings() as w:
1170 warnings.simplefilter("always", DeprecationWarning)
1171 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1172 self.assertEqual(len(w.warnings), 1)
1173 warning = w.warnings[0]
1174 self.assertTrue(warning.category is DeprecationWarning)
1175 self.assertEqual(str(warning.message),
1176 "max_buffer_size is deprecated")
1177
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001178 def test_constructor_with_not_readable(self):
1179 class NotReadable(MockRawIO):
1180 def readable(self):
1181 return False
1182
1183 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1184
1185 def test_constructor_with_not_writeable(self):
1186 class NotWriteable(MockRawIO):
1187 def writable(self):
1188 return False
1189
1190 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1191
1192 def test_read(self):
1193 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1194
1195 self.assertEqual(pair.read(3), b"abc")
1196 self.assertEqual(pair.read(1), b"d")
1197 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001198 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1199 self.assertEqual(pair.read(None), b"abc")
1200
1201 def test_readlines(self):
1202 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1203 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1204 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1205 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001206
1207 def test_read1(self):
1208 # .read1() is delegated to the underlying reader object, so this test
1209 # can be shallow.
1210 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1211
1212 self.assertEqual(pair.read1(3), b"abc")
1213
1214 def test_readinto(self):
1215 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1216
1217 data = bytearray(5)
1218 self.assertEqual(pair.readinto(data), 5)
1219 self.assertEqual(data, b"abcde")
1220
1221 def test_write(self):
1222 w = self.MockRawIO()
1223 pair = self.tp(self.MockRawIO(), w)
1224
1225 pair.write(b"abc")
1226 pair.flush()
1227 pair.write(b"def")
1228 pair.flush()
1229 self.assertEqual(w._write_stack, [b"abc", b"def"])
1230
1231 def test_peek(self):
1232 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1233
1234 self.assertTrue(pair.peek(3).startswith(b"abc"))
1235 self.assertEqual(pair.read(3), b"abc")
1236
1237 def test_readable(self):
1238 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1239 self.assertTrue(pair.readable())
1240
1241 def test_writeable(self):
1242 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1243 self.assertTrue(pair.writable())
1244
1245 def test_seekable(self):
1246 # BufferedRWPairs are never seekable, even if their readers and writers
1247 # are.
1248 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1249 self.assertFalse(pair.seekable())
1250
1251 # .flush() is delegated to the underlying writer object and has been
1252 # tested in the test_write method.
1253
1254 def test_close_and_closed(self):
1255 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1256 self.assertFalse(pair.closed)
1257 pair.close()
1258 self.assertTrue(pair.closed)
1259
1260 def test_isatty(self):
1261 class SelectableIsAtty(MockRawIO):
1262 def __init__(self, isatty):
1263 MockRawIO.__init__(self)
1264 self._isatty = isatty
1265
1266 def isatty(self):
1267 return self._isatty
1268
1269 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1270 self.assertFalse(pair.isatty())
1271
1272 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1273 self.assertTrue(pair.isatty())
1274
1275 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1276 self.assertTrue(pair.isatty())
1277
1278 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1279 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281class CBufferedRWPairTest(BufferedRWPairTest):
1282 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284class PyBufferedRWPairTest(BufferedRWPairTest):
1285 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287
1288class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1289 read_mode = "rb+"
1290 write_mode = "wb+"
1291
1292 def test_constructor(self):
1293 BufferedReaderTest.test_constructor(self)
1294 BufferedWriterTest.test_constructor(self)
1295
1296 def test_read_and_write(self):
1297 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001298 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001299
1300 self.assertEqual(b"as", rw.read(2))
1301 rw.write(b"ddd")
1302 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001303 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001305 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001306
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307 def test_seek_and_tell(self):
1308 raw = self.BytesIO(b"asdfghjkl")
1309 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001310
1311 self.assertEquals(b"as", rw.read(2))
1312 self.assertEquals(2, rw.tell())
1313 rw.seek(0, 0)
1314 self.assertEquals(b"asdf", rw.read(4))
1315
1316 rw.write(b"asdf")
1317 rw.seek(0, 0)
1318 self.assertEquals(b"asdfasdfl", rw.read())
1319 self.assertEquals(9, rw.tell())
1320 rw.seek(-4, 2)
1321 self.assertEquals(5, rw.tell())
1322 rw.seek(2, 1)
1323 self.assertEquals(7, rw.tell())
1324 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001325 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001327 def check_flush_and_read(self, read_func):
1328 raw = self.BytesIO(b"abcdefghi")
1329 bufio = self.tp(raw)
1330
1331 self.assertEquals(b"ab", read_func(bufio, 2))
1332 bufio.write(b"12")
1333 self.assertEquals(b"ef", read_func(bufio, 2))
1334 self.assertEquals(6, bufio.tell())
1335 bufio.flush()
1336 self.assertEquals(6, bufio.tell())
1337 self.assertEquals(b"ghi", read_func(bufio))
1338 raw.seek(0, 0)
1339 raw.write(b"XYZ")
1340 # flush() resets the read buffer
1341 bufio.flush()
1342 bufio.seek(0, 0)
1343 self.assertEquals(b"XYZ", read_func(bufio, 3))
1344
1345 def test_flush_and_read(self):
1346 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1347
1348 def test_flush_and_readinto(self):
1349 def _readinto(bufio, n=-1):
1350 b = bytearray(n if n >= 0 else 9999)
1351 n = bufio.readinto(b)
1352 return bytes(b[:n])
1353 self.check_flush_and_read(_readinto)
1354
1355 def test_flush_and_peek(self):
1356 def _peek(bufio, n=-1):
1357 # This relies on the fact that the buffer can contain the whole
1358 # raw stream, otherwise peek() can return less.
1359 b = bufio.peek(n)
1360 if n != -1:
1361 b = b[:n]
1362 bufio.seek(len(b), 1)
1363 return b
1364 self.check_flush_and_read(_peek)
1365
1366 def test_flush_and_write(self):
1367 raw = self.BytesIO(b"abcdefghi")
1368 bufio = self.tp(raw)
1369
1370 bufio.write(b"123")
1371 bufio.flush()
1372 bufio.write(b"45")
1373 bufio.flush()
1374 bufio.seek(0, 0)
1375 self.assertEquals(b"12345fghi", raw.getvalue())
1376 self.assertEquals(b"12345fghi", bufio.read())
1377
1378 def test_threads(self):
1379 BufferedReaderTest.test_threads(self)
1380 BufferedWriterTest.test_threads(self)
1381
1382 def test_writes_and_peek(self):
1383 def _peek(bufio):
1384 bufio.peek(1)
1385 self.check_writes(_peek)
1386 def _peek(bufio):
1387 pos = bufio.tell()
1388 bufio.seek(-1, 1)
1389 bufio.peek(1)
1390 bufio.seek(pos, 0)
1391 self.check_writes(_peek)
1392
1393 def test_writes_and_reads(self):
1394 def _read(bufio):
1395 bufio.seek(-1, 1)
1396 bufio.read(1)
1397 self.check_writes(_read)
1398
1399 def test_writes_and_read1s(self):
1400 def _read1(bufio):
1401 bufio.seek(-1, 1)
1402 bufio.read1(1)
1403 self.check_writes(_read1)
1404
1405 def test_writes_and_readintos(self):
1406 def _read(bufio):
1407 bufio.seek(-1, 1)
1408 bufio.readinto(bytearray(1))
1409 self.check_writes(_read)
1410
Antoine Pitrou0473e562009-08-06 20:52:43 +00001411 def test_write_after_readahead(self):
1412 # Issue #6629: writing after the buffer was filled by readahead should
1413 # first rewind the raw stream.
1414 for overwrite_size in [1, 5]:
1415 raw = self.BytesIO(b"A" * 10)
1416 bufio = self.tp(raw, 4)
1417 # Trigger readahead
1418 self.assertEqual(bufio.read(1), b"A")
1419 self.assertEqual(bufio.tell(), 1)
1420 # Overwriting should rewind the raw stream if it needs so
1421 bufio.write(b"B" * overwrite_size)
1422 self.assertEqual(bufio.tell(), overwrite_size + 1)
1423 # If the write size was smaller than the buffer size, flush() and
1424 # check that rewind happens.
1425 bufio.flush()
1426 self.assertEqual(bufio.tell(), overwrite_size + 1)
1427 s = raw.getvalue()
1428 self.assertEqual(s,
1429 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1430
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001431 def test_truncate_after_read_or_write(self):
1432 raw = self.BytesIO(b"A" * 10)
1433 bufio = self.tp(raw, 100)
1434 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1435 self.assertEqual(bufio.truncate(), 2)
1436 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1437 self.assertEqual(bufio.truncate(), 4)
1438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 def test_misbehaved_io(self):
1440 BufferedReaderTest.test_misbehaved_io(self)
1441 BufferedWriterTest.test_misbehaved_io(self)
1442
1443class CBufferedRandomTest(BufferedRandomTest):
1444 tp = io.BufferedRandom
1445
1446 def test_constructor(self):
1447 BufferedRandomTest.test_constructor(self)
1448 # The allocation can succeed on 32-bit builds, e.g. with more
1449 # than 2GB RAM and a 64-bit kernel.
1450 if sys.maxsize > 0x7FFFFFFF:
1451 rawio = self.MockRawIO()
1452 bufio = self.tp(rawio)
1453 self.assertRaises((OverflowError, MemoryError, ValueError),
1454 bufio.__init__, rawio, sys.maxsize)
1455
1456 def test_garbage_collection(self):
1457 CBufferedReaderTest.test_garbage_collection(self)
1458 CBufferedWriterTest.test_garbage_collection(self)
1459
1460class PyBufferedRandomTest(BufferedRandomTest):
1461 tp = pyio.BufferedRandom
1462
1463
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001464# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1465# properties:
1466# - A single output character can correspond to many bytes of input.
1467# - The number of input bytes to complete the character can be
1468# undetermined until the last input byte is received.
1469# - The number of input bytes can vary depending on previous input.
1470# - A single input byte can correspond to many characters of output.
1471# - The number of output characters can be undetermined until the
1472# last input byte is received.
1473# - The number of output characters can vary depending on previous input.
1474
1475class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1476 """
1477 For testing seek/tell behavior with a stateful, buffering decoder.
1478
1479 Input is a sequence of words. Words may be fixed-length (length set
1480 by input) or variable-length (period-terminated). In variable-length
1481 mode, extra periods are ignored. Possible words are:
1482 - 'i' followed by a number sets the input length, I (maximum 99).
1483 When I is set to 0, words are space-terminated.
1484 - 'o' followed by a number sets the output length, O (maximum 99).
1485 - Any other word is converted into a word followed by a period on
1486 the output. The output word consists of the input word truncated
1487 or padded out with hyphens to make its length equal to O. If O
1488 is 0, the word is output verbatim without truncating or padding.
1489 I and O are initially set to 1. When I changes, any buffered input is
1490 re-scanned according to the new I. EOF also terminates the last word.
1491 """
1492
1493 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001494 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001495 self.reset()
1496
1497 def __repr__(self):
1498 return '<SID %x>' % id(self)
1499
1500 def reset(self):
1501 self.i = 1
1502 self.o = 1
1503 self.buffer = bytearray()
1504
1505 def getstate(self):
1506 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1507 return bytes(self.buffer), i*100 + o
1508
1509 def setstate(self, state):
1510 buffer, io = state
1511 self.buffer = bytearray(buffer)
1512 i, o = divmod(io, 100)
1513 self.i, self.o = i ^ 1, o ^ 1
1514
1515 def decode(self, input, final=False):
1516 output = ''
1517 for b in input:
1518 if self.i == 0: # variable-length, terminated with period
1519 if b == ord('.'):
1520 if self.buffer:
1521 output += self.process_word()
1522 else:
1523 self.buffer.append(b)
1524 else: # fixed-length, terminate after self.i bytes
1525 self.buffer.append(b)
1526 if len(self.buffer) == self.i:
1527 output += self.process_word()
1528 if final and self.buffer: # EOF terminates the last word
1529 output += self.process_word()
1530 return output
1531
1532 def process_word(self):
1533 output = ''
1534 if self.buffer[0] == ord('i'):
1535 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1536 elif self.buffer[0] == ord('o'):
1537 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1538 else:
1539 output = self.buffer.decode('ascii')
1540 if len(output) < self.o:
1541 output += '-'*self.o # pad out with hyphens
1542 if self.o:
1543 output = output[:self.o] # truncate to output length
1544 output += '.'
1545 self.buffer = bytearray()
1546 return output
1547
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001548 codecEnabled = False
1549
1550 @classmethod
1551 def lookupTestDecoder(cls, name):
1552 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001553 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001554 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001555 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001556 incrementalencoder=None,
1557 streamreader=None, streamwriter=None,
1558 incrementaldecoder=cls)
1559
1560# Register the previous decoder for testing.
1561# Disabled by default, tests will enable it.
1562codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1563
1564
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001565class StatefulIncrementalDecoderTest(unittest.TestCase):
1566 """
1567 Make sure the StatefulIncrementalDecoder actually works.
1568 """
1569
1570 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001571 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001572 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001573 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001574 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001575 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001576 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001577 # I=0, O=6 (variable-length input, fixed-length output)
1578 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1579 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001580 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001581 # I=6, O=3 (fixed-length input > fixed-length output)
1582 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1583 # I=0, then 3; O=29, then 15 (with longer output)
1584 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1585 'a----------------------------.' +
1586 'b----------------------------.' +
1587 'cde--------------------------.' +
1588 'abcdefghijabcde.' +
1589 'a.b------------.' +
1590 '.c.------------.' +
1591 'd.e------------.' +
1592 'k--------------.' +
1593 'l--------------.' +
1594 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001595 ]
1596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001598 # Try a few one-shot test cases.
1599 for input, eof, output in self.test_cases:
1600 d = StatefulIncrementalDecoder()
1601 self.assertEquals(d.decode(input, eof), output)
1602
1603 # Also test an unfinished decode, followed by forcing EOF.
1604 d = StatefulIncrementalDecoder()
1605 self.assertEquals(d.decode(b'oiabcd'), '')
1606 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001607
1608class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001609
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001610 def setUp(self):
1611 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1612 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001613 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001614
Guido van Rossumd0712812007-04-11 16:32:43 +00001615 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001616 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 def test_constructor(self):
1619 r = self.BytesIO(b"\xc3\xa9\n\n")
1620 b = self.BufferedReader(r, 1000)
1621 t = self.TextIOWrapper(b)
1622 t.__init__(b, encoding="latin1", newline="\r\n")
1623 self.assertEquals(t.encoding, "latin1")
1624 self.assertEquals(t.line_buffering, False)
1625 t.__init__(b, encoding="utf8", line_buffering=True)
1626 self.assertEquals(t.encoding, "utf8")
1627 self.assertEquals(t.line_buffering, True)
1628 self.assertEquals("\xe9\n", t.readline())
1629 self.assertRaises(TypeError, t.__init__, b, newline=42)
1630 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1631
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001632 def test_detach(self):
1633 r = self.BytesIO()
1634 b = self.BufferedWriter(r)
1635 t = self.TextIOWrapper(b)
1636 self.assertIs(t.detach(), b)
1637
1638 t = self.TextIOWrapper(b, encoding="ascii")
1639 t.write("howdy")
1640 self.assertFalse(r.getvalue())
1641 t.detach()
1642 self.assertEqual(r.getvalue(), b"howdy")
1643 self.assertRaises(ValueError, t.detach)
1644
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001645 def test_repr(self):
1646 raw = self.BytesIO("hello".encode("utf-8"))
1647 b = self.BufferedReader(raw)
1648 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001649 modname = self.TextIOWrapper.__module__
1650 self.assertEqual(repr(t),
1651 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1652 raw.name = "dummy"
1653 self.assertEqual(repr(t),
1654 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1655 raw.name = b"dummy"
1656 self.assertEqual(repr(t),
1657 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 def test_line_buffering(self):
1660 r = self.BytesIO()
1661 b = self.BufferedWriter(r, 1000)
1662 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001663 t.write("X")
1664 self.assertEquals(r.getvalue(), b"") # No flush happened
1665 t.write("Y\nZ")
1666 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1667 t.write("A\rB")
1668 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 def test_encoding(self):
1671 # Check the encoding attribute is always set, and valid
1672 b = self.BytesIO()
1673 t = self.TextIOWrapper(b, encoding="utf8")
1674 self.assertEqual(t.encoding, "utf8")
1675 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001676 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 codecs.lookup(t.encoding)
1678
1679 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001680 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 b = self.BytesIO(b"abc\n\xff\n")
1682 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001683 self.assertRaises(UnicodeError, t.read)
1684 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 b = self.BytesIO(b"abc\n\xff\n")
1686 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001687 self.assertRaises(UnicodeError, t.read)
1688 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 b = self.BytesIO(b"abc\n\xff\n")
1690 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001691 self.assertEquals(t.read(), "abc\n\n")
1692 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 b = self.BytesIO(b"abc\n\xff\n")
1694 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001695 self.assertEquals(t.read(), "abc\n\ufffd\n")
1696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001698 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 b = self.BytesIO()
1700 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001701 self.assertRaises(UnicodeError, t.write, "\xff")
1702 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 b = self.BytesIO()
1704 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001705 self.assertRaises(UnicodeError, t.write, "\xff")
1706 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 b = self.BytesIO()
1708 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001709 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001710 t.write("abc\xffdef\n")
1711 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001712 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001713 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 b = self.BytesIO()
1715 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001716 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001717 t.write("abc\xffdef\n")
1718 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001719 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001722 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1723
1724 tests = [
1725 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001726 [ '', input_lines ],
1727 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1728 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1729 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001730 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001731 encodings = (
1732 'utf-8', 'latin-1',
1733 'utf-16', 'utf-16-le', 'utf-16-be',
1734 'utf-32', 'utf-32-le', 'utf-32-be',
1735 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001736
Guido van Rossum8358db22007-08-18 21:39:55 +00001737 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001738 # character in TextIOWrapper._pending_line.
1739 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001740 # XXX: str.encode() should return bytes
1741 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001742 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001743 for bufsize in range(1, 10):
1744 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1746 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001747 encoding=encoding)
1748 if do_reads:
1749 got_lines = []
1750 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001751 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001752 if c2 == '':
1753 break
1754 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001755 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001756 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001757 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001758
1759 for got_line, exp_line in zip(got_lines, exp_lines):
1760 self.assertEquals(got_line, exp_line)
1761 self.assertEquals(len(got_lines), len(exp_lines))
1762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 def test_newlines_input(self):
1764 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001765 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1766 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001767 (None, normalized.decode("ascii").splitlines(True)),
1768 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1770 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1771 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001772 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 buf = self.BytesIO(testdata)
1774 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001775 self.assertEquals(txt.readlines(), expected)
1776 txt.seek(0)
1777 self.assertEquals(txt.read(), "".join(expected))
1778
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 def test_newlines_output(self):
1780 testdict = {
1781 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1782 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1783 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1784 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1785 }
1786 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1787 for newline, expected in tests:
1788 buf = self.BytesIO()
1789 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1790 txt.write("AAA\nB")
1791 txt.write("BB\nCCC\n")
1792 txt.write("X\rY\r\nZ")
1793 txt.flush()
1794 self.assertEquals(buf.closed, False)
1795 self.assertEquals(buf.getvalue(), expected)
1796
1797 def test_destructor(self):
1798 l = []
1799 base = self.BytesIO
1800 class MyBytesIO(base):
1801 def close(self):
1802 l.append(self.getvalue())
1803 base.close(self)
1804 b = MyBytesIO()
1805 t = self.TextIOWrapper(b, encoding="ascii")
1806 t.write("abc")
1807 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001808 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001809 self.assertEquals([b"abc"], l)
1810
1811 def test_override_destructor(self):
1812 record = []
1813 class MyTextIO(self.TextIOWrapper):
1814 def __del__(self):
1815 record.append(1)
1816 try:
1817 f = super().__del__
1818 except AttributeError:
1819 pass
1820 else:
1821 f()
1822 def close(self):
1823 record.append(2)
1824 super().close()
1825 def flush(self):
1826 record.append(3)
1827 super().flush()
1828 b = self.BytesIO()
1829 t = MyTextIO(b, encoding="ascii")
1830 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001831 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 self.assertEqual(record, [1, 2, 3])
1833
1834 def test_error_through_destructor(self):
1835 # Test that the exception state is not modified by a destructor,
1836 # even if close() fails.
1837 rawio = self.CloseFailureIO()
1838 def f():
1839 self.TextIOWrapper(rawio).xyzzy
1840 with support.captured_output("stderr") as s:
1841 self.assertRaises(AttributeError, f)
1842 s = s.getvalue().strip()
1843 if s:
1844 # The destructor *may* have printed an unraisable error, check it
1845 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001846 self.assertTrue(s.startswith("Exception IOError: "), s)
1847 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001848
Guido van Rossum9b76da62007-04-11 01:09:03 +00001849 # Systematic tests of the text I/O API
1850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001852 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1853 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001855 f._CHUNK_SIZE = chunksize
1856 self.assertEquals(f.write("abc"), 3)
1857 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001859 f._CHUNK_SIZE = chunksize
1860 self.assertEquals(f.tell(), 0)
1861 self.assertEquals(f.read(), "abc")
1862 cookie = f.tell()
1863 self.assertEquals(f.seek(0), 0)
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001864 self.assertEquals(f.read(None), "abc")
1865 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001866 self.assertEquals(f.read(2), "ab")
1867 self.assertEquals(f.read(1), "c")
1868 self.assertEquals(f.read(1), "")
1869 self.assertEquals(f.read(), "")
1870 self.assertEquals(f.tell(), cookie)
1871 self.assertEquals(f.seek(0), 0)
1872 self.assertEquals(f.seek(0, 2), cookie)
1873 self.assertEquals(f.write("def"), 3)
1874 self.assertEquals(f.seek(cookie), cookie)
1875 self.assertEquals(f.read(), "def")
1876 if enc.startswith("utf"):
1877 self.multi_line_test(f, enc)
1878 f.close()
1879
1880 def multi_line_test(self, f, enc):
1881 f.seek(0)
1882 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001883 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001884 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001885 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001886 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001887 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001888 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001889 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001890 wlines.append((f.tell(), line))
1891 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001892 f.seek(0)
1893 rlines = []
1894 while True:
1895 pos = f.tell()
1896 line = f.readline()
1897 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001898 break
1899 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001900 self.assertEquals(rlines, wlines)
1901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 def test_telling(self):
1903 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001904 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001905 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001906 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001907 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001908 p2 = f.tell()
1909 f.seek(0)
1910 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001911 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001912 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001913 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001914 self.assertEquals(f.tell(), p2)
1915 f.seek(0)
1916 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001917 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001918 self.assertRaises(IOError, f.tell)
1919 self.assertEquals(f.tell(), p2)
1920 f.close()
1921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 def test_seeking(self):
1923 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001924 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001925 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001926 prefix = bytes(u_prefix.encode("utf-8"))
1927 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001928 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001929 suffix = bytes(u_suffix.encode("utf-8"))
1930 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001932 f.write(line*2)
1933 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001935 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001936 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001937 self.assertEquals(f.tell(), prefix_size)
1938 self.assertEquals(f.readline(), u_suffix)
1939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001941 # Regression test for a specific bug
1942 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001944 f.write(data)
1945 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001947 f._CHUNK_SIZE # Just test that it exists
1948 f._CHUNK_SIZE = 2
1949 f.readline()
1950 f.tell()
1951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 def test_seek_and_tell(self):
1953 #Test seek/tell using the StatefulIncrementalDecoder.
1954 # Make test faster by doing smaller seeks
1955 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001956
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001957 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001958 """Tell/seek to various points within a data stream and ensure
1959 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001961 f.write(data)
1962 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 f = self.open(support.TESTFN, encoding='test_decoder')
1964 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001965 decoded = f.read()
1966 f.close()
1967
Neal Norwitze2b07052008-03-18 19:52:05 +00001968 for i in range(min_pos, len(decoded) + 1): # seek positions
1969 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001971 self.assertEquals(f.read(i), decoded[:i])
1972 cookie = f.tell()
1973 self.assertEquals(f.read(j), decoded[i:i + j])
1974 f.seek(cookie)
1975 self.assertEquals(f.read(), decoded[i:])
1976 f.close()
1977
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001978 # Enable the test decoder.
1979 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001980
1981 # Run the tests.
1982 try:
1983 # Try each test case.
1984 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001985 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001986
1987 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001988 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1989 offset = CHUNK_SIZE - len(input)//2
1990 prefix = b'.'*offset
1991 # Don't bother seeking into the prefix (takes too long).
1992 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001993 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001994
1995 # Ensure our test decoder won't interfere with subsequent tests.
1996 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001997 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002000 data = "1234567890"
2001 tests = ("utf-16",
2002 "utf-16-le",
2003 "utf-16-be",
2004 "utf-32",
2005 "utf-32-le",
2006 "utf-32-be")
2007 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 buf = self.BytesIO()
2009 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002010 # Check if the BOM is written only once (see issue1753).
2011 f.write(data)
2012 f.write(data)
2013 f.seek(0)
2014 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002015 f.seek(0)
2016 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002017 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2018
Benjamin Petersona1b49012009-03-31 23:11:32 +00002019 def test_unreadable(self):
2020 class UnReadable(self.BytesIO):
2021 def readable(self):
2022 return False
2023 txt = self.TextIOWrapper(UnReadable())
2024 self.assertRaises(IOError, txt.read)
2025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 def test_read_one_by_one(self):
2027 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002028 reads = ""
2029 while True:
2030 c = txt.read(1)
2031 if not c:
2032 break
2033 reads += c
2034 self.assertEquals(reads, "AA\nBB")
2035
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002036 def test_readlines(self):
2037 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2038 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2039 txt.seek(0)
2040 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2041 txt.seek(0)
2042 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2043
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002046 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002048 reads = ""
2049 while True:
2050 c = txt.read(128)
2051 if not c:
2052 break
2053 reads += c
2054 self.assertEquals(reads, "A"*127+"\nB")
2055
2056 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002057 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002058
2059 # read one char at a time
2060 reads = ""
2061 while True:
2062 c = txt.read(1)
2063 if not c:
2064 break
2065 reads += c
2066 self.assertEquals(reads, self.normalized)
2067
2068 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002069 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002070 txt._CHUNK_SIZE = 4
2071
2072 reads = ""
2073 while True:
2074 c = txt.read(4)
2075 if not c:
2076 break
2077 reads += c
2078 self.assertEquals(reads, self.normalized)
2079
2080 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002082 txt._CHUNK_SIZE = 4
2083
2084 reads = txt.read(4)
2085 reads += txt.read(4)
2086 reads += txt.readline()
2087 reads += txt.readline()
2088 reads += txt.readline()
2089 self.assertEquals(reads, self.normalized)
2090
2091 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002092 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002093 txt._CHUNK_SIZE = 4
2094
2095 reads = txt.read(4)
2096 reads += txt.read()
2097 self.assertEquals(reads, self.normalized)
2098
2099 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002101 txt._CHUNK_SIZE = 4
2102
2103 reads = txt.read(4)
2104 pos = txt.tell()
2105 txt.seek(0)
2106 txt.seek(pos)
2107 self.assertEquals(txt.read(4), "BBB\n")
2108
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002109 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110 buffer = self.BytesIO(self.testdata)
2111 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002112
2113 self.assertEqual(buffer.seekable(), txt.seekable())
2114
Antoine Pitroue4501852009-05-14 18:55:55 +00002115 def test_append_bom(self):
2116 # The BOM is not written again when appending to a non-empty file
2117 filename = support.TESTFN
2118 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2119 with self.open(filename, 'w', encoding=charset) as f:
2120 f.write('aaa')
2121 pos = f.tell()
2122 with self.open(filename, 'rb') as f:
2123 self.assertEquals(f.read(), 'aaa'.encode(charset))
2124
2125 with self.open(filename, 'a', encoding=charset) as f:
2126 f.write('xxx')
2127 with self.open(filename, 'rb') as f:
2128 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2129
2130 def test_seek_bom(self):
2131 # Same test, but when seeking manually
2132 filename = support.TESTFN
2133 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2134 with self.open(filename, 'w', encoding=charset) as f:
2135 f.write('aaa')
2136 pos = f.tell()
2137 with self.open(filename, 'r+', encoding=charset) as f:
2138 f.seek(pos)
2139 f.write('zzz')
2140 f.seek(0)
2141 f.write('bbb')
2142 with self.open(filename, 'rb') as f:
2143 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2144
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002145 def test_errors_property(self):
2146 with self.open(support.TESTFN, "w") as f:
2147 self.assertEqual(f.errors, "strict")
2148 with self.open(support.TESTFN, "w", errors="replace") as f:
2149 self.assertEqual(f.errors, "replace")
2150
Antoine Pitroue4501852009-05-14 18:55:55 +00002151
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002152 def test_threads_write(self):
2153 # Issue6750: concurrent writes could duplicate data
2154 event = threading.Event()
2155 with self.open(support.TESTFN, "w", buffering=1) as f:
2156 def run(n):
2157 text = "Thread%03d\n" % n
2158 event.wait()
2159 f.write(text)
2160 threads = [threading.Thread(target=lambda n=x: run(n))
2161 for x in range(20)]
2162 for t in threads:
2163 t.start()
2164 time.sleep(0.02)
2165 event.set()
2166 for t in threads:
2167 t.join()
2168 with self.open(support.TESTFN) as f:
2169 content = f.read()
2170 for n in range(20):
2171 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2172
Antoine Pitroufaf90072010-05-03 16:58:19 +00002173 def test_flush_error_on_close(self):
2174 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2175 def bad_flush():
2176 raise IOError()
2177 txt.flush = bad_flush
2178 self.assertRaises(IOError, txt.close) # exception not swallowed
2179
2180 def test_multi_close(self):
2181 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2182 txt.close()
2183 txt.close()
2184 txt.close()
2185 self.assertRaises(ValueError, txt.flush)
2186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187class CTextIOWrapperTest(TextIOWrapperTest):
2188
2189 def test_initialization(self):
2190 r = self.BytesIO(b"\xc3\xa9\n\n")
2191 b = self.BufferedReader(r, 1000)
2192 t = self.TextIOWrapper(b)
2193 self.assertRaises(TypeError, t.__init__, b, newline=42)
2194 self.assertRaises(ValueError, t.read)
2195 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2196 self.assertRaises(ValueError, t.read)
2197
2198 def test_garbage_collection(self):
2199 # C TextIOWrapper objects are collected, and collecting them flushes
2200 # all data to disk.
2201 # The Python version has __del__, so it ends in gc.garbage instead.
2202 rawio = io.FileIO(support.TESTFN, "wb")
2203 b = self.BufferedWriter(rawio)
2204 t = self.TextIOWrapper(b, encoding="ascii")
2205 t.write("456def")
2206 t.x = t
2207 wr = weakref.ref(t)
2208 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002209 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002210 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002211 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002212 self.assertEqual(f.read(), b"456def")
2213
2214class PyTextIOWrapperTest(TextIOWrapperTest):
2215 pass
2216
2217
2218class IncrementalNewlineDecoderTest(unittest.TestCase):
2219
2220 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002221 # UTF-8 specific tests for a newline decoder
2222 def _check_decode(b, s, **kwargs):
2223 # We exercise getstate() / setstate() as well as decode()
2224 state = decoder.getstate()
2225 self.assertEquals(decoder.decode(b, **kwargs), s)
2226 decoder.setstate(state)
2227 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002228
Antoine Pitrou180a3362008-12-14 16:36:46 +00002229 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002230
Antoine Pitrou180a3362008-12-14 16:36:46 +00002231 _check_decode(b'\xe8', "")
2232 _check_decode(b'\xa2', "")
2233 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002234
Antoine Pitrou180a3362008-12-14 16:36:46 +00002235 _check_decode(b'\xe8', "")
2236 _check_decode(b'\xa2', "")
2237 _check_decode(b'\x88', "\u8888")
2238
2239 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002240 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2241
Antoine Pitrou180a3362008-12-14 16:36:46 +00002242 decoder.reset()
2243 _check_decode(b'\n', "\n")
2244 _check_decode(b'\r', "")
2245 _check_decode(b'', "\n", final=True)
2246 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002247
Antoine Pitrou180a3362008-12-14 16:36:46 +00002248 _check_decode(b'\r', "")
2249 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002250
Antoine Pitrou180a3362008-12-14 16:36:46 +00002251 _check_decode(b'\r\r\n', "\n\n")
2252 _check_decode(b'\r', "")
2253 _check_decode(b'\r', "\n")
2254 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002255
Antoine Pitrou180a3362008-12-14 16:36:46 +00002256 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2257 _check_decode(b'\xe8\xa2\x88', "\u8888")
2258 _check_decode(b'\n', "\n")
2259 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2260 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002263 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002264 if encoding is not None:
2265 encoder = codecs.getincrementalencoder(encoding)()
2266 def _decode_bytewise(s):
2267 # Decode one byte at a time
2268 for b in encoder.encode(s):
2269 result.append(decoder.decode(bytes([b])))
2270 else:
2271 encoder = None
2272 def _decode_bytewise(s):
2273 # Decode one char at a time
2274 for c in s:
2275 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002276 self.assertEquals(decoder.newlines, None)
2277 _decode_bytewise("abc\n\r")
2278 self.assertEquals(decoder.newlines, '\n')
2279 _decode_bytewise("\nabc")
2280 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2281 _decode_bytewise("abc\r")
2282 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2283 _decode_bytewise("abc")
2284 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2285 _decode_bytewise("abc\r")
2286 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2287 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 input = "abc"
2289 if encoder is not None:
2290 encoder.reset()
2291 input = encoder.encode(input)
2292 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002293 self.assertEquals(decoder.newlines, None)
2294
2295 def test_newline_decoder(self):
2296 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 # None meaning the IncrementalNewlineDecoder takes unicode input
2298 # rather than bytes input
2299 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002300 'utf-16', 'utf-16-le', 'utf-16-be',
2301 'utf-32', 'utf-32-le', 'utf-32-be',
2302 )
2303 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002304 decoder = enc and codecs.getincrementaldecoder(enc)()
2305 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2306 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002307 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002308 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2309 self.check_newline_decoding_utf8(decoder)
2310
Antoine Pitrou66913e22009-03-06 23:40:56 +00002311 def test_newline_bytes(self):
2312 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2313 def _check(dec):
2314 self.assertEquals(dec.newlines, None)
2315 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2316 self.assertEquals(dec.newlines, None)
2317 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2318 self.assertEquals(dec.newlines, None)
2319 dec = self.IncrementalNewlineDecoder(None, translate=False)
2320 _check(dec)
2321 dec = self.IncrementalNewlineDecoder(None, translate=True)
2322 _check(dec)
2323
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2325 pass
2326
2327class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2328 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002329
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002330
Guido van Rossum01a27522007-03-07 01:00:12 +00002331# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002332
Guido van Rossum5abbf752007-08-27 17:39:33 +00002333class MiscIOTest(unittest.TestCase):
2334
Barry Warsaw40e82462008-11-20 20:14:50 +00002335 def tearDown(self):
2336 support.unlink(support.TESTFN)
2337
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 def test___all__(self):
2339 for name in self.io.__all__:
2340 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002341 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002342 if name == "open":
2343 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002344 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002345 self.assertTrue(issubclass(obj, Exception), name)
2346 elif not name.startswith("SEEK_"):
2347 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002348
Barry Warsaw40e82462008-11-20 20:14:50 +00002349 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002350 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002351 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002352 f.close()
2353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002355 self.assertEquals(f.name, support.TESTFN)
2356 self.assertEquals(f.buffer.name, support.TESTFN)
2357 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2358 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002359 self.assertEquals(f.buffer.mode, "rb")
2360 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002361 f.close()
2362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002364 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002365 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2366 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002367
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002369 self.assertEquals(g.mode, "wb")
2370 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002371 self.assertEquals(g.name, f.fileno())
2372 self.assertEquals(g.raw.name, f.fileno())
2373 f.close()
2374 g.close()
2375
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002376 def test_io_after_close(self):
2377 for kwargs in [
2378 {"mode": "w"},
2379 {"mode": "wb"},
2380 {"mode": "w", "buffering": 1},
2381 {"mode": "w", "buffering": 2},
2382 {"mode": "wb", "buffering": 0},
2383 {"mode": "r"},
2384 {"mode": "rb"},
2385 {"mode": "r", "buffering": 1},
2386 {"mode": "r", "buffering": 2},
2387 {"mode": "rb", "buffering": 0},
2388 {"mode": "w+"},
2389 {"mode": "w+b"},
2390 {"mode": "w+", "buffering": 1},
2391 {"mode": "w+", "buffering": 2},
2392 {"mode": "w+b", "buffering": 0},
2393 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002395 f.close()
2396 self.assertRaises(ValueError, f.flush)
2397 self.assertRaises(ValueError, f.fileno)
2398 self.assertRaises(ValueError, f.isatty)
2399 self.assertRaises(ValueError, f.__iter__)
2400 if hasattr(f, "peek"):
2401 self.assertRaises(ValueError, f.peek, 1)
2402 self.assertRaises(ValueError, f.read)
2403 if hasattr(f, "read1"):
2404 self.assertRaises(ValueError, f.read1, 1024)
2405 if hasattr(f, "readinto"):
2406 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2407 self.assertRaises(ValueError, f.readline)
2408 self.assertRaises(ValueError, f.readlines)
2409 self.assertRaises(ValueError, f.seek, 0)
2410 self.assertRaises(ValueError, f.tell)
2411 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412 self.assertRaises(ValueError, f.write,
2413 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002414 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002416
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002417 def test_blockingioerror(self):
2418 # Various BlockingIOError issues
2419 self.assertRaises(TypeError, self.BlockingIOError)
2420 self.assertRaises(TypeError, self.BlockingIOError, 1)
2421 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2422 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2423 b = self.BlockingIOError(1, "")
2424 self.assertEqual(b.characters_written, 0)
2425 class C(str):
2426 pass
2427 c = C("")
2428 b = self.BlockingIOError(1, c)
2429 c.b = b
2430 b.c = c
2431 wr = weakref.ref(c)
2432 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002433 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002434 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002435
2436 def test_abcs(self):
2437 # Test the visible base classes are ABCs.
2438 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2439 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2440 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2441 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2442
2443 def _check_abc_inheritance(self, abcmodule):
2444 with self.open(support.TESTFN, "wb", buffering=0) as f:
2445 self.assertTrue(isinstance(f, abcmodule.IOBase))
2446 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2447 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2448 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2449 with self.open(support.TESTFN, "wb") as f:
2450 self.assertTrue(isinstance(f, abcmodule.IOBase))
2451 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2452 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2453 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2454 with self.open(support.TESTFN, "w") as f:
2455 self.assertTrue(isinstance(f, abcmodule.IOBase))
2456 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2457 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2458 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2459
2460 def test_abc_inheritance(self):
2461 # Test implementations inherit from their respective ABCs
2462 self._check_abc_inheritance(self)
2463
2464 def test_abc_inheritance_official(self):
2465 # Test implementations inherit from the official ABCs of the
2466 # baseline "io" module.
2467 self._check_abc_inheritance(io)
2468
2469class CMiscIOTest(MiscIOTest):
2470 io = io
2471
2472class PyMiscIOTest(MiscIOTest):
2473 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002474
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002475
2476@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2477class SignalsTest(unittest.TestCase):
2478
2479 def setUp(self):
2480 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2481
2482 def tearDown(self):
2483 signal.signal(signal.SIGALRM, self.oldalrm)
2484
2485 def alarm_interrupt(self, sig, frame):
2486 1/0
2487
2488 @unittest.skipUnless(threading, 'Threading required for this test.')
2489 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2490 """Check that a partial write, when it gets interrupted, properly
2491 invokes the signal handler."""
2492 read_results = []
2493 def _read():
2494 s = os.read(r, 1)
2495 read_results.append(s)
2496 t = threading.Thread(target=_read)
2497 t.daemon = True
2498 r, w = os.pipe()
2499 try:
2500 wio = self.io.open(w, **fdopen_kwargs)
2501 t.start()
2502 signal.alarm(1)
2503 # Fill the pipe enough that the write will be blocking.
2504 # It will be interrupted by the timer armed above. Since the
2505 # other thread has read one byte, the low-level write will
2506 # return with a successful (partial) result rather than an EINTR.
2507 # The buffered IO layer must check for pending signal
2508 # handlers, which in this case will invoke alarm_interrupt().
2509 self.assertRaises(ZeroDivisionError,
2510 wio.write, item * (1024 * 1024))
2511 t.join()
2512 # We got one byte, get another one and check that it isn't a
2513 # repeat of the first one.
2514 read_results.append(os.read(r, 1))
2515 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2516 finally:
2517 os.close(w)
2518 os.close(r)
2519 # This is deliberate. If we didn't close the file descriptor
2520 # before closing wio, wio would try to flush its internal
2521 # buffer, and block again.
2522 try:
2523 wio.close()
2524 except IOError as e:
2525 if e.errno != errno.EBADF:
2526 raise
2527
2528 def test_interrupted_write_unbuffered(self):
2529 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2530
2531 def test_interrupted_write_buffered(self):
2532 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2533
2534 def test_interrupted_write_text(self):
2535 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2536
2537class CSignalsTest(SignalsTest):
2538 io = io
2539
2540class PySignalsTest(SignalsTest):
2541 io = pyio
2542
2543
Guido van Rossum28524c72007-02-27 05:47:44 +00002544def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 tests = (CIOTest, PyIOTest,
2546 CBufferedReaderTest, PyBufferedReaderTest,
2547 CBufferedWriterTest, PyBufferedWriterTest,
2548 CBufferedRWPairTest, PyBufferedRWPairTest,
2549 CBufferedRandomTest, PyBufferedRandomTest,
2550 StatefulIncrementalDecoderTest,
2551 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2552 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002553 CMiscIOTest, PyMiscIOTest,
2554 CSignalsTest, PySignalsTest,
2555 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556
2557 # Put the namespaces of the IO module we are testing and some useful mock
2558 # classes in the __dict__ of each test.
2559 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2560 MockNonBlockWriterIO)
2561 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2562 c_io_ns = {name : getattr(io, name) for name in all_members}
2563 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2564 globs = globals()
2565 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2566 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2567 # Avoid turning open into a bound method.
2568 py_io_ns["open"] = pyio.OpenWrapper
2569 for test in tests:
2570 if test.__name__.startswith("C"):
2571 for name, obj in c_io_ns.items():
2572 setattr(test, name, obj)
2573 elif test.__name__.startswith("Py"):
2574 for name, obj in py_io_ns.items():
2575 setattr(test, name, obj)
2576
2577 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002578
2579if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 test_main()