blob: 5f62494e63ca41bcccdd22208668f37e74816c27 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
Antoine Pitroue5e75c62010-09-14 18:53:07 +000050class MockRawIOWithoutRead:
51 """A RawIO implementation without read(), so as to exercise the default
52 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000053
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000054 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000058 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000088 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000106class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
107 pass
108
109class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
110 pass
111
112
113class MockRawIO(MockRawIOWithoutRead):
114
115 def read(self, n=None):
116 self._reads += 1
117 try:
118 return self._read_stack.pop(0)
119 except:
120 self._extraneous_reads += 1
121 return b""
122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123class CMockRawIO(MockRawIO, io.RawIOBase):
124 pass
125
126class PyMockRawIO(MockRawIO, pyio.RawIOBase):
127 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000128
Guido van Rossuma9e20242007-03-08 00:43:48 +0000129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class MisbehavedRawIO(MockRawIO):
131 def write(self, b):
132 return super().write(b) * 2
133
134 def read(self, n=None):
135 return super().read(n) * 2
136
137 def seek(self, pos, whence):
138 return -123
139
140 def tell(self):
141 return -456
142
143 def readinto(self, buf):
144 super().readinto(buf)
145 return len(buf) * 5
146
147class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
148 pass
149
150class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
151 pass
152
153
154class CloseFailureIO(MockRawIO):
155 closed = 0
156
157 def close(self):
158 if not self.closed:
159 self.closed = 1
160 raise IOError
161
162class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
163 pass
164
165class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
166 pass
167
168
169class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000170
171 def __init__(self, data):
172 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177 self.read_history.append(None if res is None else len(res))
178 return res
179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def readinto(self, b):
181 res = super().readinto(b)
182 self.read_history.append(res)
183 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185class CMockFileIO(MockFileIO, io.BytesIO):
186 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class PyMockFileIO(MockFileIO, pyio.BytesIO):
189 pass
190
191
192class MockNonBlockWriterIO:
193
194 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def pop_written(self):
199 s = b"".join(self._write_stack)
200 self._write_stack[:] = []
201 return s
202
203 def block_on(self, char):
204 """Block when a given char is encountered."""
205 self._blocker_char = char
206
207 def readable(self):
208 return True
209
210 def seekable(self):
211 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000212
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 def writable(self):
214 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def write(self, b):
217 b = bytes(b)
218 n = -1
219 if self._blocker_char:
220 try:
221 n = b.index(self._blocker_char)
222 except ValueError:
223 pass
224 else:
225 self._blocker_char = None
226 self._write_stack.append(b[:n])
227 raise self.BlockingIOError(0, "test blocking", n)
228 self._write_stack.append(b)
229 return len(b)
230
231class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
232 BlockingIOError = io.BlockingIOError
233
234class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
235 BlockingIOError = pyio.BlockingIOError
236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum28524c72007-02-27 05:47:44 +0000238class IOTest(unittest.TestCase):
239
Neal Norwitze7789b12008-03-24 06:18:09 +0000240 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000241 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000242
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000243 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000244 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000245
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000248 f.truncate(0)
249 self.assertEqual(f.tell(), 5)
250 f.seek(0)
251
252 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.seek(0), 0)
254 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000259 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000260 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000261 self.assertEqual(f.seek(-1, 2), 13)
262 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000263
Guido van Rossum87429772007-04-10 21:06:59 +0000264 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000265 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000267
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 def read_ops(self, f, buffered=False):
269 data = f.read(5)
270 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.readinto(data), 5)
273 self.assertEqual(data, b" worl")
274 self.assertEqual(f.readinto(data), 2)
275 self.assertEqual(len(data), 5)
276 self.assertEqual(data[:2], b"d\n")
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.read(20), b"hello world\n")
279 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000280 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000281 self.assertEqual(f.seek(-6, 2), 6)
282 self.assertEqual(f.read(5), b"world")
283 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000285 self.assertEqual(f.seek(-6, 1), 5)
286 self.assertEqual(f.read(5), b" worl")
287 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000288 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000289 if buffered:
290 f.seek(0)
291 self.assertEqual(f.read(), b"hello world\n")
292 f.seek(6)
293 self.assertEqual(f.read(), b"world\n")
294 self.assertEqual(f.read(), b"")
295
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 LARGE = 2**31
297
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 def large_file_ops(self, f):
299 assert f.readable()
300 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(self.LARGE), self.LARGE)
302 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000304 self.assertEqual(f.tell(), self.LARGE + 3)
305 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000307 self.assertEqual(f.tell(), self.LARGE + 2)
308 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000310 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
312 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000313 self.assertEqual(f.read(2), b"x")
314
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 def test_invalid_operations(self):
316 # Try writing on a file opened in read mode and vice-versa.
317 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000318 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000319 self.assertRaises(IOError, fp.read)
320 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000321 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 self.assertRaises(IOError, fp.write, b"blah")
323 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000324 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 self.assertRaises(IOError, fp.write, "blah")
326 self.assertRaises(IOError, fp.writelines, ["blah\n"])
327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb", buffering=0) as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb", buffering=0) as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339
Guido van Rossum87429772007-04-10 21:06:59 +0000340 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 self.assertEqual(f.readable(), False)
343 self.assertEqual(f.writable(), True)
344 self.assertEqual(f.seekable(), True)
345 self.write_ops(f)
346 with self.open(support.TESTFN, "rb") as f:
347 self.assertEqual(f.readable(), True)
348 self.assertEqual(f.writable(), False)
349 self.assertEqual(f.seekable(), True)
350 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000351
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000353 with self.open(support.TESTFN, "wb") as f:
354 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
355 with self.open(support.TESTFN, "rb") as f:
356 self.assertEqual(f.readline(), b"abc\n")
357 self.assertEqual(f.readline(10), b"def\n")
358 self.assertEqual(f.readline(2), b"xy")
359 self.assertEqual(f.readline(4), b"zzy\n")
360 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000361 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000362 self.assertRaises(TypeError, f.readline, 5.3)
363 with self.open(support.TESTFN, "r") as f:
364 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000367 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000368 self.write_ops(f)
369 data = f.getvalue()
370 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000372 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000373
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 # On Windows and Mac OSX this test comsumes large resources; It takes
376 # a long time to build the >2GB file and takes >2GB of disk space
377 # therefore the resource must be enabled to run this test.
378 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000379 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 print("\nTesting large file ops skipped on %s." % sys.platform,
381 file=sys.stderr)
382 print("It requires %d bytes and a long time." % self.LARGE,
383 file=sys.stderr)
384 print("Use 'regrtest.py -u largefile test_io' to run it.",
385 file=sys.stderr)
386 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "w+b", 0) as f:
388 self.large_file_ops(f)
389 with self.open(support.TESTFN, "w+b") as f:
390 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000391
392 def test_with_open(self):
393 for bufsize in (0, 1, 100):
394 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000396 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.closed, True)
398 f = None
399 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000401 1/0
402 except ZeroDivisionError:
403 self.assertEqual(f.closed, True)
404 else:
405 self.fail("1/0 didn't raise an exception")
406
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407 # issue 5008
408 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000410 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000412 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000414 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000416 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000417
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def test_destructor(self):
419 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def __del__(self):
422 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000429 def close(self):
430 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000432 def flush(self):
433 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000434 super().flush()
435 f = MyFileIO(support.TESTFN, "wb")
436 f.write(b"xxx")
437 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000438 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000441 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442
443 def _check_base_destructor(self, base):
444 record = []
445 class MyIO(base):
446 def __init__(self):
447 # This exercises the availability of attributes on object
448 # destruction.
449 # (in the C version, close() is called by the tp_dealloc
450 # function, not by __del__)
451 self.on_del = 1
452 self.on_close = 2
453 self.on_flush = 3
454 def __del__(self):
455 record.append(self.on_del)
456 try:
457 f = super().__del__
458 except AttributeError:
459 pass
460 else:
461 f()
462 def close(self):
463 record.append(self.on_close)
464 super().close()
465 def flush(self):
466 record.append(self.on_flush)
467 super().flush()
468 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000470 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 self.assertEqual(record, [1, 2, 3])
472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_IOBase_destructor(self):
474 self._check_base_destructor(self.IOBase)
475
476 def test_RawIOBase_destructor(self):
477 self._check_base_destructor(self.RawIOBase)
478
479 def test_BufferedIOBase_destructor(self):
480 self._check_base_destructor(self.BufferedIOBase)
481
482 def test_TextIOBase_destructor(self):
483 self._check_base_destructor(self.TextIOBase)
484
Guido van Rossum87429772007-04-10 21:06:59 +0000485 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000486 with self.open(support.TESTFN, "wb") as f:
487 f.write(b"xxx")
488 with self.open(support.TESTFN, "rb") as f:
489 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Guido van Rossumd4103952007-04-12 05:44:49 +0000491 def test_array_writes(self):
492 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000493 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000494 with self.open(support.TESTFN, "wb", 0) as f:
495 self.assertEqual(f.write(a), n)
496 with self.open(support.TESTFN, "wb") as f:
497 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000498
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000499 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000501 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_read_closed(self):
504 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "r") as f:
507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.read(), "egg\n")
509 file.seek(0)
510 file.close()
511 self.assertRaises(ValueError, file.read)
512
513 def test_no_closefd_with_filename(self):
514 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000516
517 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000519 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000521 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000523 self.assertEqual(file.buffer.raw.closefd, False)
524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 def test_garbage_collection(self):
526 # FileIO objects are collected, and collecting them flushes
527 # all data to disk.
528 f = self.FileIO(support.TESTFN, "wb")
529 f.write(b"abcxxx")
530 f.f = f
531 wr = weakref.ref(f)
532 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000533 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000534 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000537
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 def test_unbounded_file(self):
539 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
540 zero = "/dev/zero"
541 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000542 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000543 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000544 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000545 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000546 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000547 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000548 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000550 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000551 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000552 self.assertRaises(OverflowError, f.read)
553
Antoine Pitroufaf90072010-05-03 16:58:19 +0000554 def test_flush_error_on_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 def bad_flush():
557 raise IOError()
558 f.flush = bad_flush
559 self.assertRaises(IOError, f.close) # exception not swallowed
560
561 def test_multi_close(self):
562 f = self.open(support.TESTFN, "wb", buffering=0)
563 f.close()
564 f.close()
565 f.close()
566 self.assertRaises(ValueError, f.flush)
567
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000568 def test_RawIOBase_read(self):
569 # Exercise the default RawIOBase.read() implementation (which calls
570 # readinto() internally).
571 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
572 self.assertEqual(rawio.read(2), b"ab")
573 self.assertEqual(rawio.read(2), b"c")
574 self.assertEqual(rawio.read(2), b"d")
575 self.assertEqual(rawio.read(2), None)
576 self.assertEqual(rawio.read(2), b"ef")
577 self.assertEqual(rawio.read(2), b"g")
578 self.assertEqual(rawio.read(2), None)
579 self.assertEqual(rawio.read(2), b"")
580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581class CIOTest(IOTest):
582 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584class PyIOTest(IOTest):
585 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000586
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class CommonBufferedTests:
589 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
590
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000591 def test_detach(self):
592 raw = self.MockRawIO()
593 buf = self.tp(raw)
594 self.assertIs(buf.detach(), raw)
595 self.assertRaises(ValueError, buf.detach)
596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 def test_fileno(self):
598 rawio = self.MockRawIO()
599 bufio = self.tp(rawio)
600
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000601 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602
603 def test_no_fileno(self):
604 # XXX will we always have fileno() function? If so, kill
605 # this test. Else, write it.
606 pass
607
608 def test_invalid_args(self):
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 # Invalid whence
612 self.assertRaises(ValueError, bufio.seek, 0, -1)
613 self.assertRaises(ValueError, bufio.seek, 0, 3)
614
615 def test_override_destructor(self):
616 tp = self.tp
617 record = []
618 class MyBufferedIO(tp):
619 def __del__(self):
620 record.append(1)
621 try:
622 f = super().__del__
623 except AttributeError:
624 pass
625 else:
626 f()
627 def close(self):
628 record.append(2)
629 super().close()
630 def flush(self):
631 record.append(3)
632 super().flush()
633 rawio = self.MockRawIO()
634 bufio = MyBufferedIO(rawio)
635 writable = bufio.writable()
636 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000637 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 if writable:
639 self.assertEqual(record, [1, 2, 3])
640 else:
641 self.assertEqual(record, [1, 2])
642
643 def test_context_manager(self):
644 # Test usability as a context manager
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647 def _with():
648 with bufio:
649 pass
650 _with()
651 # bufio should now be closed, and using it a second time should raise
652 # a ValueError.
653 self.assertRaises(ValueError, _with)
654
655 def test_error_through_destructor(self):
656 # Test that the exception state is not modified by a destructor,
657 # even if close() fails.
658 rawio = self.CloseFailureIO()
659 def f():
660 self.tp(rawio).xyzzy
661 with support.captured_output("stderr") as s:
662 self.assertRaises(AttributeError, f)
663 s = s.getvalue().strip()
664 if s:
665 # The destructor *may* have printed an unraisable error, check it
666 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000667 self.assertTrue(s.startswith("Exception IOError: "), s)
668 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000669
Antoine Pitrou716c4442009-05-23 19:04:03 +0000670 def test_repr(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
674 self.assertEqual(repr(b), "<%s>" % clsname)
675 raw.name = "dummy"
676 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
677 raw.name = b"dummy"
678 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
679
Antoine Pitroufaf90072010-05-03 16:58:19 +0000680 def test_flush_error_on_close(self):
681 raw = self.MockRawIO()
682 def bad_flush():
683 raise IOError()
684 raw.flush = bad_flush
685 b = self.tp(raw)
686 self.assertRaises(IOError, b.close) # exception not swallowed
687
688 def test_multi_close(self):
689 raw = self.MockRawIO()
690 b = self.tp(raw)
691 b.close()
692 b.close()
693 b.close()
694 self.assertRaises(ValueError, b.flush)
695
Antoine Pitrou6cfc5122010-12-21 21:26:09 +0000696 def test_readonly_attributes(self):
697 raw = self.MockRawIO()
698 buf = self.tp(raw)
699 x = self.MockRawIO()
700 with self.assertRaises(AttributeError):
701 buf.raw = x
702
Guido van Rossum78892e42007-04-06 17:31:18 +0000703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
705 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_constructor(self):
708 rawio = self.MockRawIO([b"abc"])
709 bufio = self.tp(rawio)
710 bufio.__init__(rawio)
711 bufio.__init__(rawio, buffer_size=1024)
712 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000713 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
715 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
717 rawio = self.MockRawIO([b"abc"])
718 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000719 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000722 for arg in (None, 7):
723 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
724 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000725 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 # Invalid args
727 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read1(self):
730 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
731 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000732 self.assertEqual(b"a", bufio.read(1))
733 self.assertEqual(b"b", bufio.read1(1))
734 self.assertEqual(rawio._reads, 1)
735 self.assertEqual(b"c", bufio.read1(100))
736 self.assertEqual(rawio._reads, 1)
737 self.assertEqual(b"d", bufio.read1(100))
738 self.assertEqual(rawio._reads, 2)
739 self.assertEqual(b"efg", bufio.read1(100))
740 self.assertEqual(rawio._reads, 3)
741 self.assertEqual(b"", bufio.read1(100))
742 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 # Invalid args
744 self.assertRaises(ValueError, bufio.read1, -1)
745
746 def test_readinto(self):
747 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
748 bufio = self.tp(rawio)
749 b = bytearray(2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000750 self.assertEqual(bufio.readinto(b), 2)
751 self.assertEqual(b, b"ab")
752 self.assertEqual(bufio.readinto(b), 2)
753 self.assertEqual(b, b"cd")
754 self.assertEqual(bufio.readinto(b), 2)
755 self.assertEqual(b, b"ef")
756 self.assertEqual(bufio.readinto(b), 1)
757 self.assertEqual(b, b"gf")
758 self.assertEqual(bufio.readinto(b), 0)
759 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000760
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000761 def test_readlines(self):
762 def bufio():
763 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
764 return self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000765 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
766 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
767 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000770 data = b"abcdefghi"
771 dlen = len(data)
772
773 tests = [
774 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
775 [ 100, [ 3, 3, 3], [ dlen ] ],
776 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
777 ]
778
779 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 rawio = self.MockFileIO(data)
781 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000782 pos = 0
783 for nbytes in buf_read_sizes:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000784 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000785 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 # this is mildly implementation-dependent
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000787 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000790 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
792 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000793
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000794 self.assertEqual(b"abcd", bufio.read(6))
795 self.assertEqual(b"e", bufio.read(1))
796 self.assertEqual(b"fg", bufio.read())
797 self.assertEqual(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000798 self.assertTrue(None is bufio.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000799 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000800
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000801 def test_read_past_eof(self):
802 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000804
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000805 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 def test_read_all(self):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000810
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000811 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000812
Antoine Pitrou27314942010-10-14 15:41:23 +0000813 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000815 try:
816 # Write out many bytes with exactly the same number of 0's,
817 # 1's... 255's. This will help us check that concurrent reading
818 # doesn't duplicate or forget contents.
819 N = 1000
820 l = list(range(256)) * N
821 random.shuffle(l)
822 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000823 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000824 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000825 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000827 errors = []
828 results = []
829 def f():
830 try:
831 # Intra-buffer read then buffer-flushing read
832 for n in cycle([1, 19]):
833 s = bufio.read(n)
834 if not s:
835 break
836 # list.append() is atomic
837 results.append(s)
838 except Exception as e:
839 errors.append(e)
840 raise
841 threads = [threading.Thread(target=f) for x in range(20)]
842 for t in threads:
843 t.start()
844 time.sleep(0.02) # yield
845 for t in threads:
846 t.join()
847 self.assertFalse(errors,
848 "the following exceptions were caught: %r" % errors)
849 s = b''.join(results)
850 for i in range(256):
851 c = bytes(bytearray([i]))
852 self.assertEqual(s.count(c), N)
853 finally:
854 support.unlink(support.TESTFN)
855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 def test_misbehaved_io(self):
857 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
858 bufio = self.tp(rawio)
859 self.assertRaises(IOError, bufio.seek, 0)
860 self.assertRaises(IOError, bufio.tell)
861
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000862 def test_no_extraneous_read(self):
863 # Issue #9550; when the raw IO object has satisfied the read request,
864 # we should not issue any additional reads, otherwise it may block
865 # (e.g. socket).
866 bufsize = 16
867 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
868 rawio = self.MockRawIO([b"x" * n])
869 bufio = self.tp(rawio, bufsize)
870 self.assertEqual(bufio.read(n), b"x" * n)
871 # Simple case: one raw read is enough to satisfy the request.
872 self.assertEqual(rawio._extraneous_reads, 0,
873 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
874 # A more complex case where two raw reads are needed to satisfy
875 # the request.
876 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
877 bufio = self.tp(rawio, bufsize)
878 self.assertEqual(bufio.read(n), b"x" * n)
879 self.assertEqual(rawio._extraneous_reads, 0,
880 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
881
882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883class CBufferedReaderTest(BufferedReaderTest):
884 tp = io.BufferedReader
885
886 def test_constructor(self):
887 BufferedReaderTest.test_constructor(self)
888 # The allocation can succeed on 32-bit builds, e.g. with more
889 # than 2GB RAM and a 64-bit kernel.
890 if sys.maxsize > 0x7FFFFFFF:
891 rawio = self.MockRawIO()
892 bufio = self.tp(rawio)
893 self.assertRaises((OverflowError, MemoryError, ValueError),
894 bufio.__init__, rawio, sys.maxsize)
895
896 def test_initialization(self):
897 rawio = self.MockRawIO([b"abc"])
898 bufio = self.tp(rawio)
899 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
900 self.assertRaises(ValueError, bufio.read)
901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
902 self.assertRaises(ValueError, bufio.read)
903 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
904 self.assertRaises(ValueError, bufio.read)
905
906 def test_misbehaved_io_read(self):
907 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
909 # _pyio.BufferedReader seems to implement reading different, so that
910 # checking this is not so easy.
911 self.assertRaises(IOError, bufio.read, 10)
912
913 def test_garbage_collection(self):
914 # C BufferedReader objects are collected.
915 # The Python version has __del__, so it ends into gc.garbage instead
916 rawio = self.FileIO(support.TESTFN, "w+b")
917 f = self.tp(rawio)
918 f.f = f
919 wr = weakref.ref(f)
920 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000921 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000922 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923
924class PyBufferedReaderTest(BufferedReaderTest):
925 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000926
Guido van Rossuma9e20242007-03-08 00:43:48 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
929 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 def test_constructor(self):
932 rawio = self.MockRawIO()
933 bufio = self.tp(rawio)
934 bufio.__init__(rawio)
935 bufio.__init__(rawio, buffer_size=1024)
936 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000937 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 bufio.flush()
939 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
940 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
942 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000943 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000944 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000945 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000947 def test_detach_flush(self):
948 raw = self.MockRawIO()
949 buf = self.tp(raw)
950 buf.write(b"howdy!")
951 self.assertFalse(raw._write_stack)
952 buf.detach()
953 self.assertEqual(raw._write_stack, [b"howdy!"])
954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 writer = self.MockRawIO()
958 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000959 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000960 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_write_overflow(self):
963 writer = self.MockRawIO()
964 bufio = self.tp(writer, 8)
965 contents = b"abcdefghijklmnop"
966 for n in range(0, len(contents), 3):
967 bufio.write(contents[n:n+3])
968 flushed = b"".join(writer._write_stack)
969 # At least (total - 8) bytes were implicitly flushed, perhaps more
970 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000971 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 def check_writes(self, intermediate_func):
974 # Lots of writes, test the flushed output is as expected.
975 contents = bytes(range(256)) * 1000
976 n = 0
977 writer = self.MockRawIO()
978 bufio = self.tp(writer, 13)
979 # Generator of write sizes: repeat each N 15 times then proceed to N+1
980 def gen_sizes():
981 for size in count(1):
982 for i in range(15):
983 yield size
984 sizes = gen_sizes()
985 while n < len(contents):
986 size = min(next(sizes), len(contents) - n)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000987 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 intermediate_func(bufio)
989 n += size
990 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000991 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 def test_writes(self):
994 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_writes_and_flushes(self):
997 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000999 def test_writes_and_seeks(self):
1000 def _seekabs(bufio):
1001 pos = bufio.tell()
1002 bufio.seek(pos + 1, 0)
1003 bufio.seek(pos - 1, 0)
1004 bufio.seek(pos, 0)
1005 self.check_writes(_seekabs)
1006 def _seekrel(bufio):
1007 pos = bufio.seek(0, 1)
1008 bufio.seek(+1, 1)
1009 bufio.seek(-1, 1)
1010 bufio.seek(pos, 0)
1011 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 def test_writes_and_truncates(self):
1014 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_write_non_blocking(self):
1017 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001018 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001019
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001020 self.assertEqual(bufio.write(b"abcd"), 4)
1021 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 # 1 byte will be written, the rest will be buffered
1023 raw.block_on(b"k")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001024 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1027 raw.block_on(b"0")
1028 try:
1029 bufio.write(b"opqrwxyz0123456789")
1030 except self.BlockingIOError as e:
1031 written = e.characters_written
1032 else:
1033 self.fail("BlockingIOError should have been raised")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001034 self.assertEqual(written, 16)
1035 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001037
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001038 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 s = raw.pop_written()
1040 # Previously buffered bytes were flushed
1041 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 def test_write_and_rewind(self):
1044 raw = io.BytesIO()
1045 bufio = self.tp(raw, 4)
1046 self.assertEqual(bufio.write(b"abcdef"), 6)
1047 self.assertEqual(bufio.tell(), 6)
1048 bufio.seek(0, 0)
1049 self.assertEqual(bufio.write(b"XY"), 2)
1050 bufio.seek(6, 0)
1051 self.assertEqual(raw.getvalue(), b"XYcdef")
1052 self.assertEqual(bufio.write(b"123456"), 6)
1053 bufio.flush()
1054 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_flush(self):
1057 writer = self.MockRawIO()
1058 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001059 bufio.write(b"abc")
1060 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001061 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_destructor(self):
1064 writer = self.MockRawIO()
1065 bufio = self.tp(writer, 8)
1066 bufio.write(b"abc")
1067 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001068 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001069 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070
1071 def test_truncate(self):
1072 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001073 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 bufio = self.tp(raw, 8)
1075 bufio.write(b"abcdef")
1076 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001077 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001078 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 self.assertEqual(f.read(), b"abc")
1080
Antoine Pitrou27314942010-10-14 15:41:23 +00001081 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001083 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 # Write out many bytes from many threads and test they were
1085 # all flushed.
1086 N = 1000
1087 contents = bytes(range(256)) * N
1088 sizes = cycle([1, 19])
1089 n = 0
1090 queue = deque()
1091 while n < len(contents):
1092 size = next(sizes)
1093 queue.append(contents[n:n+size])
1094 n += size
1095 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001096 # We use a real file object because it allows us to
1097 # exercise situations where the GIL is released before
1098 # writing the buffer to the raw streams. This is in addition
1099 # to concurrency issues due to switching threads in the middle
1100 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001101 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001103 errors = []
1104 def f():
1105 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 while True:
1107 try:
1108 s = queue.popleft()
1109 except IndexError:
1110 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001111 bufio.write(s)
1112 except Exception as e:
1113 errors.append(e)
1114 raise
1115 threads = [threading.Thread(target=f) for x in range(20)]
1116 for t in threads:
1117 t.start()
1118 time.sleep(0.02) # yield
1119 for t in threads:
1120 t.join()
1121 self.assertFalse(errors,
1122 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001124 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 s = f.read()
1126 for i in range(256):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001127 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001128 finally:
1129 support.unlink(support.TESTFN)
1130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def test_misbehaved_io(self):
1132 rawio = self.MisbehavedRawIO()
1133 bufio = self.tp(rawio, 5)
1134 self.assertRaises(IOError, bufio.seek, 0)
1135 self.assertRaises(IOError, bufio.tell)
1136 self.assertRaises(IOError, bufio.write, b"abcdef")
1137
Benjamin Peterson59406a92009-03-26 17:10:29 +00001138 def test_max_buffer_size_deprecation(self):
1139 with support.check_warnings() as w:
1140 warnings.simplefilter("always", DeprecationWarning)
1141 self.tp(self.MockRawIO(), 8, 12)
1142 self.assertEqual(len(w.warnings), 1)
1143 warning = w.warnings[0]
1144 self.assertTrue(warning.category is DeprecationWarning)
1145 self.assertEqual(str(warning.message),
1146 "max_buffer_size is deprecated")
1147
1148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149class CBufferedWriterTest(BufferedWriterTest):
1150 tp = io.BufferedWriter
1151
1152 def test_constructor(self):
1153 BufferedWriterTest.test_constructor(self)
1154 # The allocation can succeed on 32-bit builds, e.g. with more
1155 # than 2GB RAM and a 64-bit kernel.
1156 if sys.maxsize > 0x7FFFFFFF:
1157 rawio = self.MockRawIO()
1158 bufio = self.tp(rawio)
1159 self.assertRaises((OverflowError, MemoryError, ValueError),
1160 bufio.__init__, rawio, sys.maxsize)
1161
1162 def test_initialization(self):
1163 rawio = self.MockRawIO()
1164 bufio = self.tp(rawio)
1165 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1166 self.assertRaises(ValueError, bufio.write, b"def")
1167 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1168 self.assertRaises(ValueError, bufio.write, b"def")
1169 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1170 self.assertRaises(ValueError, bufio.write, b"def")
1171
1172 def test_garbage_collection(self):
1173 # C BufferedWriter objects are collected, and collecting them flushes
1174 # all data to disk.
1175 # The Python version has __del__, so it ends into gc.garbage instead
1176 rawio = self.FileIO(support.TESTFN, "w+b")
1177 f = self.tp(rawio)
1178 f.write(b"123xxx")
1179 f.x = f
1180 wr = weakref.ref(f)
1181 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001182 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001183 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001184 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 self.assertEqual(f.read(), b"123xxx")
1186
1187
1188class PyBufferedWriterTest(BufferedWriterTest):
1189 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001190
Guido van Rossum01a27522007-03-07 01:00:12 +00001191class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001192
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001193 def test_constructor(self):
1194 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001195 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001196
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001197 def test_detach(self):
1198 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1199 self.assertRaises(self.UnsupportedOperation, pair.detach)
1200
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001201 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001202 with support.check_warnings() as w:
1203 warnings.simplefilter("always", DeprecationWarning)
1204 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1205 self.assertEqual(len(w.warnings), 1)
1206 warning = w.warnings[0]
1207 self.assertTrue(warning.category is DeprecationWarning)
1208 self.assertEqual(str(warning.message),
1209 "max_buffer_size is deprecated")
1210
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001211 def test_constructor_with_not_readable(self):
1212 class NotReadable(MockRawIO):
1213 def readable(self):
1214 return False
1215
1216 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1217
1218 def test_constructor_with_not_writeable(self):
1219 class NotWriteable(MockRawIO):
1220 def writable(self):
1221 return False
1222
1223 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1224
1225 def test_read(self):
1226 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1227
1228 self.assertEqual(pair.read(3), b"abc")
1229 self.assertEqual(pair.read(1), b"d")
1230 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001231 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1232 self.assertEqual(pair.read(None), b"abc")
1233
1234 def test_readlines(self):
1235 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1236 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1237 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1238 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001239
1240 def test_read1(self):
1241 # .read1() is delegated to the underlying reader object, so this test
1242 # can be shallow.
1243 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1244
1245 self.assertEqual(pair.read1(3), b"abc")
1246
1247 def test_readinto(self):
1248 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1249
1250 data = bytearray(5)
1251 self.assertEqual(pair.readinto(data), 5)
1252 self.assertEqual(data, b"abcde")
1253
1254 def test_write(self):
1255 w = self.MockRawIO()
1256 pair = self.tp(self.MockRawIO(), w)
1257
1258 pair.write(b"abc")
1259 pair.flush()
1260 pair.write(b"def")
1261 pair.flush()
1262 self.assertEqual(w._write_stack, [b"abc", b"def"])
1263
1264 def test_peek(self):
1265 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1266
1267 self.assertTrue(pair.peek(3).startswith(b"abc"))
1268 self.assertEqual(pair.read(3), b"abc")
1269
1270 def test_readable(self):
1271 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1272 self.assertTrue(pair.readable())
1273
1274 def test_writeable(self):
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276 self.assertTrue(pair.writable())
1277
1278 def test_seekable(self):
1279 # BufferedRWPairs are never seekable, even if their readers and writers
1280 # are.
1281 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1282 self.assertFalse(pair.seekable())
1283
1284 # .flush() is delegated to the underlying writer object and has been
1285 # tested in the test_write method.
1286
1287 def test_close_and_closed(self):
1288 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1289 self.assertFalse(pair.closed)
1290 pair.close()
1291 self.assertTrue(pair.closed)
1292
1293 def test_isatty(self):
1294 class SelectableIsAtty(MockRawIO):
1295 def __init__(self, isatty):
1296 MockRawIO.__init__(self)
1297 self._isatty = isatty
1298
1299 def isatty(self):
1300 return self._isatty
1301
1302 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1303 self.assertFalse(pair.isatty())
1304
1305 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1306 self.assertTrue(pair.isatty())
1307
1308 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1309 self.assertTrue(pair.isatty())
1310
1311 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1312 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001313
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314class CBufferedRWPairTest(BufferedRWPairTest):
1315 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317class PyBufferedRWPairTest(BufferedRWPairTest):
1318 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320
1321class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1322 read_mode = "rb+"
1323 write_mode = "wb+"
1324
1325 def test_constructor(self):
1326 BufferedReaderTest.test_constructor(self)
1327 BufferedWriterTest.test_constructor(self)
1328
1329 def test_read_and_write(self):
1330 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001331 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
1333 self.assertEqual(b"as", rw.read(2))
1334 rw.write(b"ddd")
1335 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001336 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001338 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 def test_seek_and_tell(self):
1341 raw = self.BytesIO(b"asdfghjkl")
1342 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001343
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001344 self.assertEqual(b"as", rw.read(2))
1345 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001346 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001347 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
1349 rw.write(b"asdf")
1350 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001351 self.assertEqual(b"asdfasdfl", rw.read())
1352 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001353 rw.seek(-4, 2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001354 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001355 rw.seek(2, 1)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001356 self.assertEqual(7, rw.tell())
1357 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001358 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 def check_flush_and_read(self, read_func):
1361 raw = self.BytesIO(b"abcdefghi")
1362 bufio = self.tp(raw)
1363
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001364 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 bufio.write(b"12")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001366 self.assertEqual(b"ef", read_func(bufio, 2))
1367 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001369 self.assertEqual(6, bufio.tell())
1370 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 raw.seek(0, 0)
1372 raw.write(b"XYZ")
1373 # flush() resets the read buffer
1374 bufio.flush()
1375 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001376 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377
1378 def test_flush_and_read(self):
1379 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1380
1381 def test_flush_and_readinto(self):
1382 def _readinto(bufio, n=-1):
1383 b = bytearray(n if n >= 0 else 9999)
1384 n = bufio.readinto(b)
1385 return bytes(b[:n])
1386 self.check_flush_and_read(_readinto)
1387
1388 def test_flush_and_peek(self):
1389 def _peek(bufio, n=-1):
1390 # This relies on the fact that the buffer can contain the whole
1391 # raw stream, otherwise peek() can return less.
1392 b = bufio.peek(n)
1393 if n != -1:
1394 b = b[:n]
1395 bufio.seek(len(b), 1)
1396 return b
1397 self.check_flush_and_read(_peek)
1398
1399 def test_flush_and_write(self):
1400 raw = self.BytesIO(b"abcdefghi")
1401 bufio = self.tp(raw)
1402
1403 bufio.write(b"123")
1404 bufio.flush()
1405 bufio.write(b"45")
1406 bufio.flush()
1407 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001408 self.assertEqual(b"12345fghi", raw.getvalue())
1409 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410
1411 def test_threads(self):
1412 BufferedReaderTest.test_threads(self)
1413 BufferedWriterTest.test_threads(self)
1414
1415 def test_writes_and_peek(self):
1416 def _peek(bufio):
1417 bufio.peek(1)
1418 self.check_writes(_peek)
1419 def _peek(bufio):
1420 pos = bufio.tell()
1421 bufio.seek(-1, 1)
1422 bufio.peek(1)
1423 bufio.seek(pos, 0)
1424 self.check_writes(_peek)
1425
1426 def test_writes_and_reads(self):
1427 def _read(bufio):
1428 bufio.seek(-1, 1)
1429 bufio.read(1)
1430 self.check_writes(_read)
1431
1432 def test_writes_and_read1s(self):
1433 def _read1(bufio):
1434 bufio.seek(-1, 1)
1435 bufio.read1(1)
1436 self.check_writes(_read1)
1437
1438 def test_writes_and_readintos(self):
1439 def _read(bufio):
1440 bufio.seek(-1, 1)
1441 bufio.readinto(bytearray(1))
1442 self.check_writes(_read)
1443
Antoine Pitrou0473e562009-08-06 20:52:43 +00001444 def test_write_after_readahead(self):
1445 # Issue #6629: writing after the buffer was filled by readahead should
1446 # first rewind the raw stream.
1447 for overwrite_size in [1, 5]:
1448 raw = self.BytesIO(b"A" * 10)
1449 bufio = self.tp(raw, 4)
1450 # Trigger readahead
1451 self.assertEqual(bufio.read(1), b"A")
1452 self.assertEqual(bufio.tell(), 1)
1453 # Overwriting should rewind the raw stream if it needs so
1454 bufio.write(b"B" * overwrite_size)
1455 self.assertEqual(bufio.tell(), overwrite_size + 1)
1456 # If the write size was smaller than the buffer size, flush() and
1457 # check that rewind happens.
1458 bufio.flush()
1459 self.assertEqual(bufio.tell(), overwrite_size + 1)
1460 s = raw.getvalue()
1461 self.assertEqual(s,
1462 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1463
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001464 def test_truncate_after_read_or_write(self):
1465 raw = self.BytesIO(b"A" * 10)
1466 bufio = self.tp(raw, 100)
1467 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1468 self.assertEqual(bufio.truncate(), 2)
1469 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1470 self.assertEqual(bufio.truncate(), 4)
1471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 def test_misbehaved_io(self):
1473 BufferedReaderTest.test_misbehaved_io(self)
1474 BufferedWriterTest.test_misbehaved_io(self)
1475
1476class CBufferedRandomTest(BufferedRandomTest):
1477 tp = io.BufferedRandom
1478
1479 def test_constructor(self):
1480 BufferedRandomTest.test_constructor(self)
1481 # The allocation can succeed on 32-bit builds, e.g. with more
1482 # than 2GB RAM and a 64-bit kernel.
1483 if sys.maxsize > 0x7FFFFFFF:
1484 rawio = self.MockRawIO()
1485 bufio = self.tp(rawio)
1486 self.assertRaises((OverflowError, MemoryError, ValueError),
1487 bufio.__init__, rawio, sys.maxsize)
1488
1489 def test_garbage_collection(self):
1490 CBufferedReaderTest.test_garbage_collection(self)
1491 CBufferedWriterTest.test_garbage_collection(self)
1492
1493class PyBufferedRandomTest(BufferedRandomTest):
1494 tp = pyio.BufferedRandom
1495
1496
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001497# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1498# properties:
1499# - A single output character can correspond to many bytes of input.
1500# - The number of input bytes to complete the character can be
1501# undetermined until the last input byte is received.
1502# - The number of input bytes can vary depending on previous input.
1503# - A single input byte can correspond to many characters of output.
1504# - The number of output characters can be undetermined until the
1505# last input byte is received.
1506# - The number of output characters can vary depending on previous input.
1507
1508class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1509 """
1510 For testing seek/tell behavior with a stateful, buffering decoder.
1511
1512 Input is a sequence of words. Words may be fixed-length (length set
1513 by input) or variable-length (period-terminated). In variable-length
1514 mode, extra periods are ignored. Possible words are:
1515 - 'i' followed by a number sets the input length, I (maximum 99).
1516 When I is set to 0, words are space-terminated.
1517 - 'o' followed by a number sets the output length, O (maximum 99).
1518 - Any other word is converted into a word followed by a period on
1519 the output. The output word consists of the input word truncated
1520 or padded out with hyphens to make its length equal to O. If O
1521 is 0, the word is output verbatim without truncating or padding.
1522 I and O are initially set to 1. When I changes, any buffered input is
1523 re-scanned according to the new I. EOF also terminates the last word.
1524 """
1525
1526 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001527 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001528 self.reset()
1529
1530 def __repr__(self):
1531 return '<SID %x>' % id(self)
1532
1533 def reset(self):
1534 self.i = 1
1535 self.o = 1
1536 self.buffer = bytearray()
1537
1538 def getstate(self):
1539 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1540 return bytes(self.buffer), i*100 + o
1541
1542 def setstate(self, state):
1543 buffer, io = state
1544 self.buffer = bytearray(buffer)
1545 i, o = divmod(io, 100)
1546 self.i, self.o = i ^ 1, o ^ 1
1547
1548 def decode(self, input, final=False):
1549 output = ''
1550 for b in input:
1551 if self.i == 0: # variable-length, terminated with period
1552 if b == ord('.'):
1553 if self.buffer:
1554 output += self.process_word()
1555 else:
1556 self.buffer.append(b)
1557 else: # fixed-length, terminate after self.i bytes
1558 self.buffer.append(b)
1559 if len(self.buffer) == self.i:
1560 output += self.process_word()
1561 if final and self.buffer: # EOF terminates the last word
1562 output += self.process_word()
1563 return output
1564
1565 def process_word(self):
1566 output = ''
1567 if self.buffer[0] == ord('i'):
1568 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1569 elif self.buffer[0] == ord('o'):
1570 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1571 else:
1572 output = self.buffer.decode('ascii')
1573 if len(output) < self.o:
1574 output += '-'*self.o # pad out with hyphens
1575 if self.o:
1576 output = output[:self.o] # truncate to output length
1577 output += '.'
1578 self.buffer = bytearray()
1579 return output
1580
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001581 codecEnabled = False
1582
1583 @classmethod
1584 def lookupTestDecoder(cls, name):
1585 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001586 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001587 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001588 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001589 incrementalencoder=None,
1590 streamreader=None, streamwriter=None,
1591 incrementaldecoder=cls)
1592
1593# Register the previous decoder for testing.
1594# Disabled by default, tests will enable it.
1595codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1596
1597
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001598class StatefulIncrementalDecoderTest(unittest.TestCase):
1599 """
1600 Make sure the StatefulIncrementalDecoder actually works.
1601 """
1602
1603 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001604 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001605 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001606 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001607 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001608 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001609 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001610 # I=0, O=6 (variable-length input, fixed-length output)
1611 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1612 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001613 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001614 # I=6, O=3 (fixed-length input > fixed-length output)
1615 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1616 # I=0, then 3; O=29, then 15 (with longer output)
1617 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1618 'a----------------------------.' +
1619 'b----------------------------.' +
1620 'cde--------------------------.' +
1621 'abcdefghijabcde.' +
1622 'a.b------------.' +
1623 '.c.------------.' +
1624 'd.e------------.' +
1625 'k--------------.' +
1626 'l--------------.' +
1627 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001628 ]
1629
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001631 # Try a few one-shot test cases.
1632 for input, eof, output in self.test_cases:
1633 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001634 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001635
1636 # Also test an unfinished decode, followed by forcing EOF.
1637 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001638 self.assertEqual(d.decode(b'oiabcd'), '')
1639 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001640
1641class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001642
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001643 def setUp(self):
1644 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1645 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001646 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001647
Guido van Rossumd0712812007-04-11 16:32:43 +00001648 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001649 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_constructor(self):
1652 r = self.BytesIO(b"\xc3\xa9\n\n")
1653 b = self.BufferedReader(r, 1000)
1654 t = self.TextIOWrapper(b)
1655 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001656 self.assertEqual(t.encoding, "latin1")
1657 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001659 self.assertEqual(t.encoding, "utf8")
1660 self.assertEqual(t.line_buffering, True)
1661 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 self.assertRaises(TypeError, t.__init__, b, newline=42)
1663 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1664
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001665 def test_detach(self):
1666 r = self.BytesIO()
1667 b = self.BufferedWriter(r)
1668 t = self.TextIOWrapper(b)
1669 self.assertIs(t.detach(), b)
1670
1671 t = self.TextIOWrapper(b, encoding="ascii")
1672 t.write("howdy")
1673 self.assertFalse(r.getvalue())
1674 t.detach()
1675 self.assertEqual(r.getvalue(), b"howdy")
1676 self.assertRaises(ValueError, t.detach)
1677
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001678 def test_repr(self):
1679 raw = self.BytesIO("hello".encode("utf-8"))
1680 b = self.BufferedReader(raw)
1681 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001682 modname = self.TextIOWrapper.__module__
1683 self.assertEqual(repr(t),
1684 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1685 raw.name = "dummy"
1686 self.assertEqual(repr(t),
1687 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1688 raw.name = b"dummy"
1689 self.assertEqual(repr(t),
1690 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 def test_line_buffering(self):
1693 r = self.BytesIO()
1694 b = self.BufferedWriter(r, 1000)
1695 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001696 t.write("X")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001697 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001698 t.write("Y\nZ")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001699 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001700 t.write("A\rB")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001701 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 def test_encoding(self):
1704 # Check the encoding attribute is always set, and valid
1705 b = self.BytesIO()
1706 t = self.TextIOWrapper(b, encoding="utf8")
1707 self.assertEqual(t.encoding, "utf8")
1708 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001709 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 codecs.lookup(t.encoding)
1711
1712 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001713 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 b = self.BytesIO(b"abc\n\xff\n")
1715 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001716 self.assertRaises(UnicodeError, t.read)
1717 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 b = self.BytesIO(b"abc\n\xff\n")
1719 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001720 self.assertRaises(UnicodeError, t.read)
1721 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 b = self.BytesIO(b"abc\n\xff\n")
1723 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001724 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001725 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 b = self.BytesIO(b"abc\n\xff\n")
1727 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001728 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001731 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 b = self.BytesIO()
1733 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001734 self.assertRaises(UnicodeError, t.write, "\xff")
1735 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 b = self.BytesIO()
1737 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001738 self.assertRaises(UnicodeError, t.write, "\xff")
1739 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 b = self.BytesIO()
1741 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001742 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001743 t.write("abc\xffdef\n")
1744 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001745 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001746 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 b = self.BytesIO()
1748 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001749 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001750 t.write("abc\xffdef\n")
1751 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001752 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001755 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1756
1757 tests = [
1758 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001759 [ '', input_lines ],
1760 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1761 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1762 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001763 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001764 encodings = (
1765 'utf-8', 'latin-1',
1766 'utf-16', 'utf-16-le', 'utf-16-be',
1767 'utf-32', 'utf-32-le', 'utf-32-be',
1768 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001769
Guido van Rossum8358db22007-08-18 21:39:55 +00001770 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001771 # character in TextIOWrapper._pending_line.
1772 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001773 # XXX: str.encode() should return bytes
1774 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001775 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001776 for bufsize in range(1, 10):
1777 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001778 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1779 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001780 encoding=encoding)
1781 if do_reads:
1782 got_lines = []
1783 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001784 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001785 if c2 == '':
1786 break
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001787 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001788 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001789 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001790 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001791
1792 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001793 self.assertEqual(got_line, exp_line)
1794 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001795
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 def test_newlines_input(self):
1797 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001798 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1799 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001800 (None, normalized.decode("ascii").splitlines(True)),
1801 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1803 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1804 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001805 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 buf = self.BytesIO(testdata)
1807 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001808 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001809 txt.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001810 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812 def test_newlines_output(self):
1813 testdict = {
1814 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1815 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1816 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1817 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1818 }
1819 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1820 for newline, expected in tests:
1821 buf = self.BytesIO()
1822 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1823 txt.write("AAA\nB")
1824 txt.write("BB\nCCC\n")
1825 txt.write("X\rY\r\nZ")
1826 txt.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001827 self.assertEqual(buf.closed, False)
1828 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829
1830 def test_destructor(self):
1831 l = []
1832 base = self.BytesIO
1833 class MyBytesIO(base):
1834 def close(self):
1835 l.append(self.getvalue())
1836 base.close(self)
1837 b = MyBytesIO()
1838 t = self.TextIOWrapper(b, encoding="ascii")
1839 t.write("abc")
1840 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001841 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001842 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843
1844 def test_override_destructor(self):
1845 record = []
1846 class MyTextIO(self.TextIOWrapper):
1847 def __del__(self):
1848 record.append(1)
1849 try:
1850 f = super().__del__
1851 except AttributeError:
1852 pass
1853 else:
1854 f()
1855 def close(self):
1856 record.append(2)
1857 super().close()
1858 def flush(self):
1859 record.append(3)
1860 super().flush()
1861 b = self.BytesIO()
1862 t = MyTextIO(b, encoding="ascii")
1863 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001864 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 self.assertEqual(record, [1, 2, 3])
1866
1867 def test_error_through_destructor(self):
1868 # Test that the exception state is not modified by a destructor,
1869 # even if close() fails.
1870 rawio = self.CloseFailureIO()
1871 def f():
1872 self.TextIOWrapper(rawio).xyzzy
1873 with support.captured_output("stderr") as s:
1874 self.assertRaises(AttributeError, f)
1875 s = s.getvalue().strip()
1876 if s:
1877 # The destructor *may* have printed an unraisable error, check it
1878 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001879 self.assertTrue(s.startswith("Exception IOError: "), s)
1880 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001881
Guido van Rossum9b76da62007-04-11 01:09:03 +00001882 # Systematic tests of the text I/O API
1883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001885 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1886 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001887 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001888 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001889 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001890 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001892 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001893 self.assertEqual(f.tell(), 0)
1894 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001895 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001896 self.assertEqual(f.seek(0), 0)
1897 self.assertEqual(f.read(None), "abc")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001898 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001899 self.assertEqual(f.read(2), "ab")
1900 self.assertEqual(f.read(1), "c")
1901 self.assertEqual(f.read(1), "")
1902 self.assertEqual(f.read(), "")
1903 self.assertEqual(f.tell(), cookie)
1904 self.assertEqual(f.seek(0), 0)
1905 self.assertEqual(f.seek(0, 2), cookie)
1906 self.assertEqual(f.write("def"), 3)
1907 self.assertEqual(f.seek(cookie), cookie)
1908 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001909 if enc.startswith("utf"):
1910 self.multi_line_test(f, enc)
1911 f.close()
1912
1913 def multi_line_test(self, f, enc):
1914 f.seek(0)
1915 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001916 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001917 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001918 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001919 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001920 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001921 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001922 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001923 wlines.append((f.tell(), line))
1924 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001925 f.seek(0)
1926 rlines = []
1927 while True:
1928 pos = f.tell()
1929 line = f.readline()
1930 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001931 break
1932 rlines.append((pos, line))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001933 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935 def test_telling(self):
1936 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001937 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001938 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001939 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001940 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001941 p2 = f.tell()
1942 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001943 self.assertEqual(f.tell(), p0)
1944 self.assertEqual(f.readline(), "\xff\n")
1945 self.assertEqual(f.tell(), p1)
1946 self.assertEqual(f.readline(), "\xff\n")
1947 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001948 f.seek(0)
1949 for line in f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001950 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001951 self.assertRaises(IOError, f.tell)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001952 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001953 f.close()
1954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 def test_seeking(self):
1956 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001957 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001958 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001959 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001960 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001961 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001962 suffix = bytes(u_suffix.encode("utf-8"))
1963 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001965 f.write(line*2)
1966 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001968 s = f.read(prefix_size)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001969 self.assertEqual(s, str(prefix, "ascii"))
1970 self.assertEqual(f.tell(), prefix_size)
1971 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00001972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001974 # Regression test for a specific bug
1975 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001977 f.write(data)
1978 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001980 f._CHUNK_SIZE # Just test that it exists
1981 f._CHUNK_SIZE = 2
1982 f.readline()
1983 f.tell()
1984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 def test_seek_and_tell(self):
1986 #Test seek/tell using the StatefulIncrementalDecoder.
1987 # Make test faster by doing smaller seeks
1988 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001989
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001990 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001991 """Tell/seek to various points within a data stream and ensure
1992 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001994 f.write(data)
1995 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 f = self.open(support.TESTFN, encoding='test_decoder')
1997 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001998 decoded = f.read()
1999 f.close()
2000
Neal Norwitze2b07052008-03-18 19:52:05 +00002001 for i in range(min_pos, len(decoded) + 1): # seek positions
2002 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002004 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002005 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002006 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002007 f.seek(cookie)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002008 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002009 f.close()
2010
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002011 # Enable the test decoder.
2012 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002013
2014 # Run the tests.
2015 try:
2016 # Try each test case.
2017 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002018 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002019
2020 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002021 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2022 offset = CHUNK_SIZE - len(input)//2
2023 prefix = b'.'*offset
2024 # Don't bother seeking into the prefix (takes too long).
2025 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002026 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002027
2028 # Ensure our test decoder won't interfere with subsequent tests.
2029 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002030 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002031
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002033 data = "1234567890"
2034 tests = ("utf-16",
2035 "utf-16-le",
2036 "utf-16-be",
2037 "utf-32",
2038 "utf-32-le",
2039 "utf-32-be")
2040 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041 buf = self.BytesIO()
2042 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002043 # Check if the BOM is written only once (see issue1753).
2044 f.write(data)
2045 f.write(data)
2046 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002047 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002048 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002049 self.assertEqual(f.read(), data * 2)
2050 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002051
Benjamin Petersona1b49012009-03-31 23:11:32 +00002052 def test_unreadable(self):
2053 class UnReadable(self.BytesIO):
2054 def readable(self):
2055 return False
2056 txt = self.TextIOWrapper(UnReadable())
2057 self.assertRaises(IOError, txt.read)
2058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 def test_read_one_by_one(self):
2060 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002061 reads = ""
2062 while True:
2063 c = txt.read(1)
2064 if not c:
2065 break
2066 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002067 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002069 def test_readlines(self):
2070 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2071 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2072 txt.seek(0)
2073 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2074 txt.seek(0)
2075 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2076
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002077 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002078 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002079 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002081 reads = ""
2082 while True:
2083 c = txt.read(128)
2084 if not c:
2085 break
2086 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002087 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002088
2089 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002091
2092 # read one char at a time
2093 reads = ""
2094 while True:
2095 c = txt.read(1)
2096 if not c:
2097 break
2098 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002099 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002100
2101 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002103 txt._CHUNK_SIZE = 4
2104
2105 reads = ""
2106 while True:
2107 c = txt.read(4)
2108 if not c:
2109 break
2110 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002111 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002112
2113 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002115 txt._CHUNK_SIZE = 4
2116
2117 reads = txt.read(4)
2118 reads += txt.read(4)
2119 reads += txt.readline()
2120 reads += txt.readline()
2121 reads += txt.readline()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002122 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002123
2124 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002126 txt._CHUNK_SIZE = 4
2127
2128 reads = txt.read(4)
2129 reads += txt.read()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002130 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002131
2132 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002134 txt._CHUNK_SIZE = 4
2135
2136 reads = txt.read(4)
2137 pos = txt.tell()
2138 txt.seek(0)
2139 txt.seek(pos)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002140 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002141
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002142 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 buffer = self.BytesIO(self.testdata)
2144 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002145
2146 self.assertEqual(buffer.seekable(), txt.seekable())
2147
Antoine Pitroue4501852009-05-14 18:55:55 +00002148 def test_append_bom(self):
2149 # The BOM is not written again when appending to a non-empty file
2150 filename = support.TESTFN
2151 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2152 with self.open(filename, 'w', encoding=charset) as f:
2153 f.write('aaa')
2154 pos = f.tell()
2155 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002156 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002157
2158 with self.open(filename, 'a', encoding=charset) as f:
2159 f.write('xxx')
2160 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002161 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002162
2163 def test_seek_bom(self):
2164 # Same test, but when seeking manually
2165 filename = support.TESTFN
2166 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2167 with self.open(filename, 'w', encoding=charset) as f:
2168 f.write('aaa')
2169 pos = f.tell()
2170 with self.open(filename, 'r+', encoding=charset) as f:
2171 f.seek(pos)
2172 f.write('zzz')
2173 f.seek(0)
2174 f.write('bbb')
2175 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002176 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002177
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002178 def test_errors_property(self):
2179 with self.open(support.TESTFN, "w") as f:
2180 self.assertEqual(f.errors, "strict")
2181 with self.open(support.TESTFN, "w", errors="replace") as f:
2182 self.assertEqual(f.errors, "replace")
2183
Antoine Pitroue4501852009-05-14 18:55:55 +00002184
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002185 def test_threads_write(self):
2186 # Issue6750: concurrent writes could duplicate data
2187 event = threading.Event()
2188 with self.open(support.TESTFN, "w", buffering=1) as f:
2189 def run(n):
2190 text = "Thread%03d\n" % n
2191 event.wait()
2192 f.write(text)
2193 threads = [threading.Thread(target=lambda n=x: run(n))
2194 for x in range(20)]
2195 for t in threads:
2196 t.start()
2197 time.sleep(0.02)
2198 event.set()
2199 for t in threads:
2200 t.join()
2201 with self.open(support.TESTFN) as f:
2202 content = f.read()
2203 for n in range(20):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002204 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002205
Antoine Pitroufaf90072010-05-03 16:58:19 +00002206 def test_flush_error_on_close(self):
2207 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2208 def bad_flush():
2209 raise IOError()
2210 txt.flush = bad_flush
2211 self.assertRaises(IOError, txt.close) # exception not swallowed
2212
2213 def test_multi_close(self):
2214 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2215 txt.close()
2216 txt.close()
2217 txt.close()
2218 self.assertRaises(ValueError, txt.flush)
2219
Antoine Pitrou6cfc5122010-12-21 21:26:09 +00002220 def test_readonly_attributes(self):
2221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2222 buf = self.BytesIO(self.testdata)
2223 with self.assertRaises(AttributeError):
2224 txt.buffer = buf
2225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226class CTextIOWrapperTest(TextIOWrapperTest):
2227
2228 def test_initialization(self):
2229 r = self.BytesIO(b"\xc3\xa9\n\n")
2230 b = self.BufferedReader(r, 1000)
2231 t = self.TextIOWrapper(b)
2232 self.assertRaises(TypeError, t.__init__, b, newline=42)
2233 self.assertRaises(ValueError, t.read)
2234 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2235 self.assertRaises(ValueError, t.read)
2236
2237 def test_garbage_collection(self):
2238 # C TextIOWrapper objects are collected, and collecting them flushes
2239 # all data to disk.
2240 # The Python version has __del__, so it ends in gc.garbage instead.
2241 rawio = io.FileIO(support.TESTFN, "wb")
2242 b = self.BufferedWriter(rawio)
2243 t = self.TextIOWrapper(b, encoding="ascii")
2244 t.write("456def")
2245 t.x = t
2246 wr = weakref.ref(t)
2247 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002248 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002249 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002250 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 self.assertEqual(f.read(), b"456def")
2252
2253class PyTextIOWrapperTest(TextIOWrapperTest):
2254 pass
2255
2256
2257class IncrementalNewlineDecoderTest(unittest.TestCase):
2258
2259 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002260 # UTF-8 specific tests for a newline decoder
2261 def _check_decode(b, s, **kwargs):
2262 # We exercise getstate() / setstate() as well as decode()
2263 state = decoder.getstate()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002264 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002265 decoder.setstate(state)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002266 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267
Antoine Pitrou180a3362008-12-14 16:36:46 +00002268 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269
Antoine Pitrou180a3362008-12-14 16:36:46 +00002270 _check_decode(b'\xe8', "")
2271 _check_decode(b'\xa2', "")
2272 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002273
Antoine Pitrou180a3362008-12-14 16:36:46 +00002274 _check_decode(b'\xe8', "")
2275 _check_decode(b'\xa2', "")
2276 _check_decode(b'\x88', "\u8888")
2277
2278 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2280
Antoine Pitrou180a3362008-12-14 16:36:46 +00002281 decoder.reset()
2282 _check_decode(b'\n', "\n")
2283 _check_decode(b'\r', "")
2284 _check_decode(b'', "\n", final=True)
2285 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002286
Antoine Pitrou180a3362008-12-14 16:36:46 +00002287 _check_decode(b'\r', "")
2288 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
Antoine Pitrou180a3362008-12-14 16:36:46 +00002290 _check_decode(b'\r\r\n', "\n\n")
2291 _check_decode(b'\r', "")
2292 _check_decode(b'\r', "\n")
2293 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002294
Antoine Pitrou180a3362008-12-14 16:36:46 +00002295 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2296 _check_decode(b'\xe8\xa2\x88', "\u8888")
2297 _check_decode(b'\n', "\n")
2298 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2299 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002302 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002303 if encoding is not None:
2304 encoder = codecs.getincrementalencoder(encoding)()
2305 def _decode_bytewise(s):
2306 # Decode one byte at a time
2307 for b in encoder.encode(s):
2308 result.append(decoder.decode(bytes([b])))
2309 else:
2310 encoder = None
2311 def _decode_bytewise(s):
2312 # Decode one char at a time
2313 for c in s:
2314 result.append(decoder.decode(c))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002315 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002316 _decode_bytewise("abc\n\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002317 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002318 _decode_bytewise("\nabc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002319 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002320 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002321 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002322 _decode_bytewise("abc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002323 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002325 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 input = "abc"
2328 if encoder is not None:
2329 encoder.reset()
2330 input = encoder.encode(input)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002331 self.assertEqual(decoder.decode(input), "abc")
2332 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002333
2334 def test_newline_decoder(self):
2335 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 # None meaning the IncrementalNewlineDecoder takes unicode input
2337 # rather than bytes input
2338 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002339 'utf-16', 'utf-16-le', 'utf-16-be',
2340 'utf-32', 'utf-32-le', 'utf-32-be',
2341 )
2342 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 decoder = enc and codecs.getincrementaldecoder(enc)()
2344 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2345 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002346 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2348 self.check_newline_decoding_utf8(decoder)
2349
Antoine Pitrou66913e22009-03-06 23:40:56 +00002350 def test_newline_bytes(self):
2351 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2352 def _check(dec):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002353 self.assertEqual(dec.newlines, None)
2354 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2355 self.assertEqual(dec.newlines, None)
2356 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2357 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002358 dec = self.IncrementalNewlineDecoder(None, translate=False)
2359 _check(dec)
2360 dec = self.IncrementalNewlineDecoder(None, translate=True)
2361 _check(dec)
2362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2364 pass
2365
2366class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2367 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002368
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002369
Guido van Rossum01a27522007-03-07 01:00:12 +00002370# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002371
Guido van Rossum5abbf752007-08-27 17:39:33 +00002372class MiscIOTest(unittest.TestCase):
2373
Barry Warsaw40e82462008-11-20 20:14:50 +00002374 def tearDown(self):
2375 support.unlink(support.TESTFN)
2376
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 def test___all__(self):
2378 for name in self.io.__all__:
2379 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002380 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002381 if name == "open":
2382 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002383 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002384 self.assertTrue(issubclass(obj, Exception), name)
2385 elif not name.startswith("SEEK_"):
2386 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002387
Barry Warsaw40e82462008-11-20 20:14:50 +00002388 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002390 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002391 f.close()
2392
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393 f = self.open(support.TESTFN, "U")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002394 self.assertEqual(f.name, support.TESTFN)
2395 self.assertEqual(f.buffer.name, support.TESTFN)
2396 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2397 self.assertEqual(f.mode, "U")
2398 self.assertEqual(f.buffer.mode, "rb")
2399 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002400 f.close()
2401
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402 f = self.open(support.TESTFN, "w+")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002403 self.assertEqual(f.mode, "w+")
2404 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2405 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002406
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002408 self.assertEqual(g.mode, "wb")
2409 self.assertEqual(g.raw.mode, "wb")
2410 self.assertEqual(g.name, f.fileno())
2411 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002412 f.close()
2413 g.close()
2414
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002415 def test_io_after_close(self):
2416 for kwargs in [
2417 {"mode": "w"},
2418 {"mode": "wb"},
2419 {"mode": "w", "buffering": 1},
2420 {"mode": "w", "buffering": 2},
2421 {"mode": "wb", "buffering": 0},
2422 {"mode": "r"},
2423 {"mode": "rb"},
2424 {"mode": "r", "buffering": 1},
2425 {"mode": "r", "buffering": 2},
2426 {"mode": "rb", "buffering": 0},
2427 {"mode": "w+"},
2428 {"mode": "w+b"},
2429 {"mode": "w+", "buffering": 1},
2430 {"mode": "w+", "buffering": 2},
2431 {"mode": "w+b", "buffering": 0},
2432 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002434 f.close()
2435 self.assertRaises(ValueError, f.flush)
2436 self.assertRaises(ValueError, f.fileno)
2437 self.assertRaises(ValueError, f.isatty)
2438 self.assertRaises(ValueError, f.__iter__)
2439 if hasattr(f, "peek"):
2440 self.assertRaises(ValueError, f.peek, 1)
2441 self.assertRaises(ValueError, f.read)
2442 if hasattr(f, "read1"):
2443 self.assertRaises(ValueError, f.read1, 1024)
2444 if hasattr(f, "readinto"):
2445 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2446 self.assertRaises(ValueError, f.readline)
2447 self.assertRaises(ValueError, f.readlines)
2448 self.assertRaises(ValueError, f.seek, 0)
2449 self.assertRaises(ValueError, f.tell)
2450 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002451 self.assertRaises(ValueError, f.write,
2452 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002453 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456 def test_blockingioerror(self):
2457 # Various BlockingIOError issues
2458 self.assertRaises(TypeError, self.BlockingIOError)
2459 self.assertRaises(TypeError, self.BlockingIOError, 1)
2460 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2461 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2462 b = self.BlockingIOError(1, "")
2463 self.assertEqual(b.characters_written, 0)
2464 class C(str):
2465 pass
2466 c = C("")
2467 b = self.BlockingIOError(1, c)
2468 c.b = b
2469 b.c = c
2470 wr = weakref.ref(c)
2471 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002472 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002473 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474
2475 def test_abcs(self):
2476 # Test the visible base classes are ABCs.
2477 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2478 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2479 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2480 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2481
2482 def _check_abc_inheritance(self, abcmodule):
2483 with self.open(support.TESTFN, "wb", buffering=0) as f:
2484 self.assertTrue(isinstance(f, abcmodule.IOBase))
2485 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2486 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2487 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2488 with self.open(support.TESTFN, "wb") as f:
2489 self.assertTrue(isinstance(f, abcmodule.IOBase))
2490 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2491 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2492 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2493 with self.open(support.TESTFN, "w") as f:
2494 self.assertTrue(isinstance(f, abcmodule.IOBase))
2495 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2496 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2497 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2498
2499 def test_abc_inheritance(self):
2500 # Test implementations inherit from their respective ABCs
2501 self._check_abc_inheritance(self)
2502
2503 def test_abc_inheritance_official(self):
2504 # Test implementations inherit from the official ABCs of the
2505 # baseline "io" module.
2506 self._check_abc_inheritance(io)
2507
2508class CMiscIOTest(MiscIOTest):
2509 io = io
2510
2511class PyMiscIOTest(MiscIOTest):
2512 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002513
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002514
2515@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2516class SignalsTest(unittest.TestCase):
2517
2518 def setUp(self):
2519 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2520
2521 def tearDown(self):
2522 signal.signal(signal.SIGALRM, self.oldalrm)
2523
2524 def alarm_interrupt(self, sig, frame):
2525 1/0
2526
2527 @unittest.skipUnless(threading, 'Threading required for this test.')
2528 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2529 """Check that a partial write, when it gets interrupted, properly
2530 invokes the signal handler."""
2531 read_results = []
2532 def _read():
2533 s = os.read(r, 1)
2534 read_results.append(s)
2535 t = threading.Thread(target=_read)
2536 t.daemon = True
2537 r, w = os.pipe()
2538 try:
2539 wio = self.io.open(w, **fdopen_kwargs)
2540 t.start()
2541 signal.alarm(1)
2542 # Fill the pipe enough that the write will be blocking.
2543 # It will be interrupted by the timer armed above. Since the
2544 # other thread has read one byte, the low-level write will
2545 # return with a successful (partial) result rather than an EINTR.
2546 # The buffered IO layer must check for pending signal
2547 # handlers, which in this case will invoke alarm_interrupt().
2548 self.assertRaises(ZeroDivisionError,
2549 wio.write, item * (1024 * 1024))
2550 t.join()
2551 # We got one byte, get another one and check that it isn't a
2552 # repeat of the first one.
2553 read_results.append(os.read(r, 1))
2554 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2555 finally:
2556 os.close(w)
2557 os.close(r)
2558 # This is deliberate. If we didn't close the file descriptor
2559 # before closing wio, wio would try to flush its internal
2560 # buffer, and block again.
2561 try:
2562 wio.close()
2563 except IOError as e:
2564 if e.errno != errno.EBADF:
2565 raise
2566
2567 def test_interrupted_write_unbuffered(self):
2568 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2569
2570 def test_interrupted_write_buffered(self):
2571 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2572
2573 def test_interrupted_write_text(self):
2574 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2575
Antoine Pitrou976157f2010-12-03 19:21:49 +00002576 def check_reentrant_write(self, data, **fdopen_kwargs):
2577 def on_alarm(*args):
2578 # Will be called reentrantly from the same thread
2579 wio.write(data)
2580 1/0
2581 signal.signal(signal.SIGALRM, on_alarm)
2582 r, w = os.pipe()
2583 wio = self.io.open(w, **fdopen_kwargs)
2584 try:
2585 signal.alarm(1)
2586 # Either the reentrant call to wio.write() fails with RuntimeError,
2587 # or the signal handler raises ZeroDivisionError.
2588 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2589 while 1:
2590 for i in range(100):
2591 wio.write(data)
2592 wio.flush()
2593 # Make sure the buffer doesn't fill up and block further writes
2594 os.read(r, len(data) * 100)
2595 finally:
2596 wio.close()
2597 os.close(r)
2598
2599 def test_reentrant_write_buffered(self):
2600 self.check_reentrant_write(b"xy", mode="wb")
2601
2602 def test_reentrant_write_text(self):
2603 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2604
2605
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002606class CSignalsTest(SignalsTest):
2607 io = io
2608
2609class PySignalsTest(SignalsTest):
2610 io = pyio
2611
Antoine Pitrou976157f2010-12-03 19:21:49 +00002612 # Handling reentrancy issues would slow down _pyio even more, so the
2613 # tests are disabled.
2614 test_reentrant_write_buffered = None
2615 test_reentrant_write_text = None
2616
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002617
Guido van Rossum28524c72007-02-27 05:47:44 +00002618def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 tests = (CIOTest, PyIOTest,
2620 CBufferedReaderTest, PyBufferedReaderTest,
2621 CBufferedWriterTest, PyBufferedWriterTest,
2622 CBufferedRWPairTest, PyBufferedRWPairTest,
2623 CBufferedRandomTest, PyBufferedRandomTest,
2624 StatefulIncrementalDecoderTest,
2625 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2626 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002627 CMiscIOTest, PyMiscIOTest,
2628 CSignalsTest, PySignalsTest,
2629 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002630
2631 # Put the namespaces of the IO module we are testing and some useful mock
2632 # classes in the __dict__ of each test.
2633 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitroue5e75c62010-09-14 18:53:07 +00002634 MockNonBlockWriterIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2636 c_io_ns = {name : getattr(io, name) for name in all_members}
2637 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2638 globs = globals()
2639 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2640 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2641 # Avoid turning open into a bound method.
2642 py_io_ns["open"] = pyio.OpenWrapper
2643 for test in tests:
2644 if test.__name__.startswith("C"):
2645 for name, obj in c_io_ns.items():
2646 setattr(test, name, obj)
2647 elif test.__name__.startswith("Py"):
2648 for name, obj in py_io_ns.items():
2649 setattr(test, name, obj)
2650
2651 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002652
2653if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002654 test_main()