blob: 1ec6f93a0e01b833a9cbab7e0d8d30b0c1fbae4f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
Antoine Pitroue5e75c62010-09-14 18:53:07 +000050class MockRawIOWithoutRead:
51 """A RawIO implementation without read(), so as to exercise the default
52 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000053
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000054 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000058 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000088 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000106class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
107 pass
108
109class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
110 pass
111
112
113class MockRawIO(MockRawIOWithoutRead):
114
115 def read(self, n=None):
116 self._reads += 1
117 try:
118 return self._read_stack.pop(0)
119 except:
120 self._extraneous_reads += 1
121 return b""
122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123class CMockRawIO(MockRawIO, io.RawIOBase):
124 pass
125
126class PyMockRawIO(MockRawIO, pyio.RawIOBase):
127 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000128
Guido van Rossuma9e20242007-03-08 00:43:48 +0000129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class MisbehavedRawIO(MockRawIO):
131 def write(self, b):
132 return super().write(b) * 2
133
134 def read(self, n=None):
135 return super().read(n) * 2
136
137 def seek(self, pos, whence):
138 return -123
139
140 def tell(self):
141 return -456
142
143 def readinto(self, buf):
144 super().readinto(buf)
145 return len(buf) * 5
146
147class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
148 pass
149
150class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
151 pass
152
153
154class CloseFailureIO(MockRawIO):
155 closed = 0
156
157 def close(self):
158 if not self.closed:
159 self.closed = 1
160 raise IOError
161
162class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
163 pass
164
165class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
166 pass
167
168
169class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000170
171 def __init__(self, data):
172 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177 self.read_history.append(None if res is None else len(res))
178 return res
179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def readinto(self, b):
181 res = super().readinto(b)
182 self.read_history.append(res)
183 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185class CMockFileIO(MockFileIO, io.BytesIO):
186 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class PyMockFileIO(MockFileIO, pyio.BytesIO):
189 pass
190
191
192class MockNonBlockWriterIO:
193
194 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def pop_written(self):
199 s = b"".join(self._write_stack)
200 self._write_stack[:] = []
201 return s
202
203 def block_on(self, char):
204 """Block when a given char is encountered."""
205 self._blocker_char = char
206
207 def readable(self):
208 return True
209
210 def seekable(self):
211 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000212
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 def writable(self):
214 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def write(self, b):
217 b = bytes(b)
218 n = -1
219 if self._blocker_char:
220 try:
221 n = b.index(self._blocker_char)
222 except ValueError:
223 pass
224 else:
225 self._blocker_char = None
226 self._write_stack.append(b[:n])
227 raise self.BlockingIOError(0, "test blocking", n)
228 self._write_stack.append(b)
229 return len(b)
230
231class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
232 BlockingIOError = io.BlockingIOError
233
234class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
235 BlockingIOError = pyio.BlockingIOError
236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum28524c72007-02-27 05:47:44 +0000238class IOTest(unittest.TestCase):
239
Neal Norwitze7789b12008-03-24 06:18:09 +0000240 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000241 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000242
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000243 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000244 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000245
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000248 f.truncate(0)
249 self.assertEqual(f.tell(), 5)
250 f.seek(0)
251
252 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.seek(0), 0)
254 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000259 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000260 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000261 self.assertEqual(f.seek(-1, 2), 13)
262 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000263
Guido van Rossum87429772007-04-10 21:06:59 +0000264 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000265 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000267
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 def read_ops(self, f, buffered=False):
269 data = f.read(5)
270 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.readinto(data), 5)
273 self.assertEqual(data, b" worl")
274 self.assertEqual(f.readinto(data), 2)
275 self.assertEqual(len(data), 5)
276 self.assertEqual(data[:2], b"d\n")
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.read(20), b"hello world\n")
279 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000280 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000281 self.assertEqual(f.seek(-6, 2), 6)
282 self.assertEqual(f.read(5), b"world")
283 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000285 self.assertEqual(f.seek(-6, 1), 5)
286 self.assertEqual(f.read(5), b" worl")
287 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000288 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000289 if buffered:
290 f.seek(0)
291 self.assertEqual(f.read(), b"hello world\n")
292 f.seek(6)
293 self.assertEqual(f.read(), b"world\n")
294 self.assertEqual(f.read(), b"")
295
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 LARGE = 2**31
297
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 def large_file_ops(self, f):
299 assert f.readable()
300 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(self.LARGE), self.LARGE)
302 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000304 self.assertEqual(f.tell(), self.LARGE + 3)
305 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000307 self.assertEqual(f.tell(), self.LARGE + 2)
308 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000310 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
312 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000313 self.assertEqual(f.read(2), b"x")
314
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 def test_invalid_operations(self):
316 # Try writing on a file opened in read mode and vice-versa.
317 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000318 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000319 self.assertRaises(IOError, fp.read)
320 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000321 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 self.assertRaises(IOError, fp.write, b"blah")
323 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000324 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 self.assertRaises(IOError, fp.write, "blah")
326 self.assertRaises(IOError, fp.writelines, ["blah\n"])
327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb", buffering=0) as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb", buffering=0) as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339
Guido van Rossum87429772007-04-10 21:06:59 +0000340 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 self.assertEqual(f.readable(), False)
343 self.assertEqual(f.writable(), True)
344 self.assertEqual(f.seekable(), True)
345 self.write_ops(f)
346 with self.open(support.TESTFN, "rb") as f:
347 self.assertEqual(f.readable(), True)
348 self.assertEqual(f.writable(), False)
349 self.assertEqual(f.seekable(), True)
350 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000351
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000353 with self.open(support.TESTFN, "wb") as f:
354 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
355 with self.open(support.TESTFN, "rb") as f:
356 self.assertEqual(f.readline(), b"abc\n")
357 self.assertEqual(f.readline(10), b"def\n")
358 self.assertEqual(f.readline(2), b"xy")
359 self.assertEqual(f.readline(4), b"zzy\n")
360 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000361 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000362 self.assertRaises(TypeError, f.readline, 5.3)
363 with self.open(support.TESTFN, "r") as f:
364 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000367 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000368 self.write_ops(f)
369 data = f.getvalue()
370 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000372 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000373
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 # On Windows and Mac OSX this test comsumes large resources; It takes
376 # a long time to build the >2GB file and takes >2GB of disk space
377 # therefore the resource must be enabled to run this test.
378 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000379 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 print("\nTesting large file ops skipped on %s." % sys.platform,
381 file=sys.stderr)
382 print("It requires %d bytes and a long time." % self.LARGE,
383 file=sys.stderr)
384 print("Use 'regrtest.py -u largefile test_io' to run it.",
385 file=sys.stderr)
386 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "w+b", 0) as f:
388 self.large_file_ops(f)
389 with self.open(support.TESTFN, "w+b") as f:
390 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000391
392 def test_with_open(self):
393 for bufsize in (0, 1, 100):
394 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000396 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.closed, True)
398 f = None
399 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000401 1/0
402 except ZeroDivisionError:
403 self.assertEqual(f.closed, True)
404 else:
405 self.fail("1/0 didn't raise an exception")
406
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407 # issue 5008
408 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000410 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000412 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000414 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000416 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000417
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def test_destructor(self):
419 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def __del__(self):
422 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000429 def close(self):
430 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000432 def flush(self):
433 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000434 super().flush()
435 f = MyFileIO(support.TESTFN, "wb")
436 f.write(b"xxx")
437 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000438 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000441 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442
443 def _check_base_destructor(self, base):
444 record = []
445 class MyIO(base):
446 def __init__(self):
447 # This exercises the availability of attributes on object
448 # destruction.
449 # (in the C version, close() is called by the tp_dealloc
450 # function, not by __del__)
451 self.on_del = 1
452 self.on_close = 2
453 self.on_flush = 3
454 def __del__(self):
455 record.append(self.on_del)
456 try:
457 f = super().__del__
458 except AttributeError:
459 pass
460 else:
461 f()
462 def close(self):
463 record.append(self.on_close)
464 super().close()
465 def flush(self):
466 record.append(self.on_flush)
467 super().flush()
468 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000470 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 self.assertEqual(record, [1, 2, 3])
472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_IOBase_destructor(self):
474 self._check_base_destructor(self.IOBase)
475
476 def test_RawIOBase_destructor(self):
477 self._check_base_destructor(self.RawIOBase)
478
479 def test_BufferedIOBase_destructor(self):
480 self._check_base_destructor(self.BufferedIOBase)
481
482 def test_TextIOBase_destructor(self):
483 self._check_base_destructor(self.TextIOBase)
484
Guido van Rossum87429772007-04-10 21:06:59 +0000485 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000486 with self.open(support.TESTFN, "wb") as f:
487 f.write(b"xxx")
488 with self.open(support.TESTFN, "rb") as f:
489 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Guido van Rossumd4103952007-04-12 05:44:49 +0000491 def test_array_writes(self):
492 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000493 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000494 with self.open(support.TESTFN, "wb", 0) as f:
495 self.assertEqual(f.write(a), n)
496 with self.open(support.TESTFN, "wb") as f:
497 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000498
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000499 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000501 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_read_closed(self):
504 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "r") as f:
507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.read(), "egg\n")
509 file.seek(0)
510 file.close()
511 self.assertRaises(ValueError, file.read)
512
513 def test_no_closefd_with_filename(self):
514 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000516
517 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000519 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000521 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000523 self.assertEqual(file.buffer.raw.closefd, False)
524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 def test_garbage_collection(self):
526 # FileIO objects are collected, and collecting them flushes
527 # all data to disk.
528 f = self.FileIO(support.TESTFN, "wb")
529 f.write(b"abcxxx")
530 f.f = f
531 wr = weakref.ref(f)
532 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000533 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000534 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000537
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 def test_unbounded_file(self):
539 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
540 zero = "/dev/zero"
541 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000542 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000543 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000544 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000545 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000546 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000547 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000548 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000550 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000551 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000552 self.assertRaises(OverflowError, f.read)
553
Antoine Pitroufaf90072010-05-03 16:58:19 +0000554 def test_flush_error_on_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 def bad_flush():
557 raise IOError()
558 f.flush = bad_flush
559 self.assertRaises(IOError, f.close) # exception not swallowed
560
561 def test_multi_close(self):
562 f = self.open(support.TESTFN, "wb", buffering=0)
563 f.close()
564 f.close()
565 f.close()
566 self.assertRaises(ValueError, f.flush)
567
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000568 def test_RawIOBase_read(self):
569 # Exercise the default RawIOBase.read() implementation (which calls
570 # readinto() internally).
571 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
572 self.assertEqual(rawio.read(2), b"ab")
573 self.assertEqual(rawio.read(2), b"c")
574 self.assertEqual(rawio.read(2), b"d")
575 self.assertEqual(rawio.read(2), None)
576 self.assertEqual(rawio.read(2), b"ef")
577 self.assertEqual(rawio.read(2), b"g")
578 self.assertEqual(rawio.read(2), None)
579 self.assertEqual(rawio.read(2), b"")
580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581class CIOTest(IOTest):
582 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584class PyIOTest(IOTest):
585 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000586
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class CommonBufferedTests:
589 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
590
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000591 def test_detach(self):
592 raw = self.MockRawIO()
593 buf = self.tp(raw)
594 self.assertIs(buf.detach(), raw)
595 self.assertRaises(ValueError, buf.detach)
596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 def test_fileno(self):
598 rawio = self.MockRawIO()
599 bufio = self.tp(rawio)
600
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000601 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602
603 def test_no_fileno(self):
604 # XXX will we always have fileno() function? If so, kill
605 # this test. Else, write it.
606 pass
607
608 def test_invalid_args(self):
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 # Invalid whence
612 self.assertRaises(ValueError, bufio.seek, 0, -1)
613 self.assertRaises(ValueError, bufio.seek, 0, 3)
614
615 def test_override_destructor(self):
616 tp = self.tp
617 record = []
618 class MyBufferedIO(tp):
619 def __del__(self):
620 record.append(1)
621 try:
622 f = super().__del__
623 except AttributeError:
624 pass
625 else:
626 f()
627 def close(self):
628 record.append(2)
629 super().close()
630 def flush(self):
631 record.append(3)
632 super().flush()
633 rawio = self.MockRawIO()
634 bufio = MyBufferedIO(rawio)
635 writable = bufio.writable()
636 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000637 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 if writable:
639 self.assertEqual(record, [1, 2, 3])
640 else:
641 self.assertEqual(record, [1, 2])
642
643 def test_context_manager(self):
644 # Test usability as a context manager
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647 def _with():
648 with bufio:
649 pass
650 _with()
651 # bufio should now be closed, and using it a second time should raise
652 # a ValueError.
653 self.assertRaises(ValueError, _with)
654
655 def test_error_through_destructor(self):
656 # Test that the exception state is not modified by a destructor,
657 # even if close() fails.
658 rawio = self.CloseFailureIO()
659 def f():
660 self.tp(rawio).xyzzy
661 with support.captured_output("stderr") as s:
662 self.assertRaises(AttributeError, f)
663 s = s.getvalue().strip()
664 if s:
665 # The destructor *may* have printed an unraisable error, check it
666 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000667 self.assertTrue(s.startswith("Exception IOError: "), s)
668 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000669
Antoine Pitrou716c4442009-05-23 19:04:03 +0000670 def test_repr(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
674 self.assertEqual(repr(b), "<%s>" % clsname)
675 raw.name = "dummy"
676 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
677 raw.name = b"dummy"
678 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
679
Antoine Pitroufaf90072010-05-03 16:58:19 +0000680 def test_flush_error_on_close(self):
681 raw = self.MockRawIO()
682 def bad_flush():
683 raise IOError()
684 raw.flush = bad_flush
685 b = self.tp(raw)
686 self.assertRaises(IOError, b.close) # exception not swallowed
687
688 def test_multi_close(self):
689 raw = self.MockRawIO()
690 b = self.tp(raw)
691 b.close()
692 b.close()
693 b.close()
694 self.assertRaises(ValueError, b.flush)
695
Antoine Pitrou6cfc5122010-12-21 21:26:09 +0000696 def test_readonly_attributes(self):
697 raw = self.MockRawIO()
698 buf = self.tp(raw)
699 x = self.MockRawIO()
700 with self.assertRaises(AttributeError):
701 buf.raw = x
702
Guido van Rossum78892e42007-04-06 17:31:18 +0000703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
705 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_constructor(self):
708 rawio = self.MockRawIO([b"abc"])
709 bufio = self.tp(rawio)
710 bufio.__init__(rawio)
711 bufio.__init__(rawio, buffer_size=1024)
712 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000713 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
715 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
717 rawio = self.MockRawIO([b"abc"])
718 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000719 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000722 for arg in (None, 7):
723 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
724 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000725 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 # Invalid args
727 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read1(self):
730 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
731 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000732 self.assertEqual(b"a", bufio.read(1))
733 self.assertEqual(b"b", bufio.read1(1))
734 self.assertEqual(rawio._reads, 1)
735 self.assertEqual(b"c", bufio.read1(100))
736 self.assertEqual(rawio._reads, 1)
737 self.assertEqual(b"d", bufio.read1(100))
738 self.assertEqual(rawio._reads, 2)
739 self.assertEqual(b"efg", bufio.read1(100))
740 self.assertEqual(rawio._reads, 3)
741 self.assertEqual(b"", bufio.read1(100))
742 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 # Invalid args
744 self.assertRaises(ValueError, bufio.read1, -1)
745
746 def test_readinto(self):
747 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
748 bufio = self.tp(rawio)
749 b = bytearray(2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000750 self.assertEqual(bufio.readinto(b), 2)
751 self.assertEqual(b, b"ab")
752 self.assertEqual(bufio.readinto(b), 2)
753 self.assertEqual(b, b"cd")
754 self.assertEqual(bufio.readinto(b), 2)
755 self.assertEqual(b, b"ef")
756 self.assertEqual(bufio.readinto(b), 1)
757 self.assertEqual(b, b"gf")
758 self.assertEqual(bufio.readinto(b), 0)
759 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000760
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000761 def test_readlines(self):
762 def bufio():
763 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
764 return self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000765 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
766 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
767 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000770 data = b"abcdefghi"
771 dlen = len(data)
772
773 tests = [
774 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
775 [ 100, [ 3, 3, 3], [ dlen ] ],
776 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
777 ]
778
779 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 rawio = self.MockFileIO(data)
781 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000782 pos = 0
783 for nbytes in buf_read_sizes:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000784 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000785 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 # this is mildly implementation-dependent
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000787 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000790 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
792 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000793
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000794 self.assertEqual(b"abcd", bufio.read(6))
795 self.assertEqual(b"e", bufio.read(1))
796 self.assertEqual(b"fg", bufio.read())
797 self.assertEqual(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000798 self.assertTrue(None is bufio.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000799 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000800
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000801 def test_read_past_eof(self):
802 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000804
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000805 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 def test_read_all(self):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000810
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000811 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000812
Antoine Pitrou27314942010-10-14 15:41:23 +0000813 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000815 try:
816 # Write out many bytes with exactly the same number of 0's,
817 # 1's... 255's. This will help us check that concurrent reading
818 # doesn't duplicate or forget contents.
819 N = 1000
820 l = list(range(256)) * N
821 random.shuffle(l)
822 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000823 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000824 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000825 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000827 errors = []
828 results = []
829 def f():
830 try:
831 # Intra-buffer read then buffer-flushing read
832 for n in cycle([1, 19]):
833 s = bufio.read(n)
834 if not s:
835 break
836 # list.append() is atomic
837 results.append(s)
838 except Exception as e:
839 errors.append(e)
840 raise
841 threads = [threading.Thread(target=f) for x in range(20)]
842 for t in threads:
843 t.start()
844 time.sleep(0.02) # yield
845 for t in threads:
846 t.join()
847 self.assertFalse(errors,
848 "the following exceptions were caught: %r" % errors)
849 s = b''.join(results)
850 for i in range(256):
851 c = bytes(bytearray([i]))
852 self.assertEqual(s.count(c), N)
853 finally:
854 support.unlink(support.TESTFN)
855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 def test_misbehaved_io(self):
857 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
858 bufio = self.tp(rawio)
859 self.assertRaises(IOError, bufio.seek, 0)
860 self.assertRaises(IOError, bufio.tell)
861
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000862 def test_no_extraneous_read(self):
863 # Issue #9550; when the raw IO object has satisfied the read request,
864 # we should not issue any additional reads, otherwise it may block
865 # (e.g. socket).
866 bufsize = 16
867 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
868 rawio = self.MockRawIO([b"x" * n])
869 bufio = self.tp(rawio, bufsize)
870 self.assertEqual(bufio.read(n), b"x" * n)
871 # Simple case: one raw read is enough to satisfy the request.
872 self.assertEqual(rawio._extraneous_reads, 0,
873 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
874 # A more complex case where two raw reads are needed to satisfy
875 # the request.
876 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
877 bufio = self.tp(rawio, bufsize)
878 self.assertEqual(bufio.read(n), b"x" * n)
879 self.assertEqual(rawio._extraneous_reads, 0,
880 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
881
882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883class CBufferedReaderTest(BufferedReaderTest):
884 tp = io.BufferedReader
885
886 def test_constructor(self):
887 BufferedReaderTest.test_constructor(self)
888 # The allocation can succeed on 32-bit builds, e.g. with more
889 # than 2GB RAM and a 64-bit kernel.
890 if sys.maxsize > 0x7FFFFFFF:
891 rawio = self.MockRawIO()
892 bufio = self.tp(rawio)
893 self.assertRaises((OverflowError, MemoryError, ValueError),
894 bufio.__init__, rawio, sys.maxsize)
895
896 def test_initialization(self):
897 rawio = self.MockRawIO([b"abc"])
898 bufio = self.tp(rawio)
899 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
900 self.assertRaises(ValueError, bufio.read)
901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
902 self.assertRaises(ValueError, bufio.read)
903 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
904 self.assertRaises(ValueError, bufio.read)
905
906 def test_misbehaved_io_read(self):
907 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
909 # _pyio.BufferedReader seems to implement reading different, so that
910 # checking this is not so easy.
911 self.assertRaises(IOError, bufio.read, 10)
912
913 def test_garbage_collection(self):
914 # C BufferedReader objects are collected.
915 # The Python version has __del__, so it ends into gc.garbage instead
916 rawio = self.FileIO(support.TESTFN, "w+b")
917 f = self.tp(rawio)
918 f.f = f
919 wr = weakref.ref(f)
920 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000921 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000922 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923
924class PyBufferedReaderTest(BufferedReaderTest):
925 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000926
Guido van Rossuma9e20242007-03-08 00:43:48 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
929 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 def test_constructor(self):
932 rawio = self.MockRawIO()
933 bufio = self.tp(rawio)
934 bufio.__init__(rawio)
935 bufio.__init__(rawio, buffer_size=1024)
936 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000937 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 bufio.flush()
939 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
940 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
942 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000943 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000944 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000945 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000947 def test_detach_flush(self):
948 raw = self.MockRawIO()
949 buf = self.tp(raw)
950 buf.write(b"howdy!")
951 self.assertFalse(raw._write_stack)
952 buf.detach()
953 self.assertEqual(raw._write_stack, [b"howdy!"])
954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 writer = self.MockRawIO()
958 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000959 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000960 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_write_overflow(self):
963 writer = self.MockRawIO()
964 bufio = self.tp(writer, 8)
965 contents = b"abcdefghijklmnop"
966 for n in range(0, len(contents), 3):
967 bufio.write(contents[n:n+3])
968 flushed = b"".join(writer._write_stack)
969 # At least (total - 8) bytes were implicitly flushed, perhaps more
970 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000971 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 def check_writes(self, intermediate_func):
974 # Lots of writes, test the flushed output is as expected.
975 contents = bytes(range(256)) * 1000
976 n = 0
977 writer = self.MockRawIO()
978 bufio = self.tp(writer, 13)
979 # Generator of write sizes: repeat each N 15 times then proceed to N+1
980 def gen_sizes():
981 for size in count(1):
982 for i in range(15):
983 yield size
984 sizes = gen_sizes()
985 while n < len(contents):
986 size = min(next(sizes), len(contents) - n)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000987 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 intermediate_func(bufio)
989 n += size
990 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000991 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 def test_writes(self):
994 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_writes_and_flushes(self):
997 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000999 def test_writes_and_seeks(self):
1000 def _seekabs(bufio):
1001 pos = bufio.tell()
1002 bufio.seek(pos + 1, 0)
1003 bufio.seek(pos - 1, 0)
1004 bufio.seek(pos, 0)
1005 self.check_writes(_seekabs)
1006 def _seekrel(bufio):
1007 pos = bufio.seek(0, 1)
1008 bufio.seek(+1, 1)
1009 bufio.seek(-1, 1)
1010 bufio.seek(pos, 0)
1011 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 def test_writes_and_truncates(self):
1014 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_write_non_blocking(self):
1017 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001018 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001019
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001020 self.assertEqual(bufio.write(b"abcd"), 4)
1021 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 # 1 byte will be written, the rest will be buffered
1023 raw.block_on(b"k")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001024 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1027 raw.block_on(b"0")
1028 try:
1029 bufio.write(b"opqrwxyz0123456789")
1030 except self.BlockingIOError as e:
1031 written = e.characters_written
1032 else:
1033 self.fail("BlockingIOError should have been raised")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001034 self.assertEqual(written, 16)
1035 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001037
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001038 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 s = raw.pop_written()
1040 # Previously buffered bytes were flushed
1041 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 def test_write_and_rewind(self):
1044 raw = io.BytesIO()
1045 bufio = self.tp(raw, 4)
1046 self.assertEqual(bufio.write(b"abcdef"), 6)
1047 self.assertEqual(bufio.tell(), 6)
1048 bufio.seek(0, 0)
1049 self.assertEqual(bufio.write(b"XY"), 2)
1050 bufio.seek(6, 0)
1051 self.assertEqual(raw.getvalue(), b"XYcdef")
1052 self.assertEqual(bufio.write(b"123456"), 6)
1053 bufio.flush()
1054 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_flush(self):
1057 writer = self.MockRawIO()
1058 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001059 bufio.write(b"abc")
1060 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001061 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_destructor(self):
1064 writer = self.MockRawIO()
1065 bufio = self.tp(writer, 8)
1066 bufio.write(b"abc")
1067 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001068 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001069 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070
1071 def test_truncate(self):
1072 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001073 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 bufio = self.tp(raw, 8)
1075 bufio.write(b"abcdef")
1076 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001077 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001078 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 self.assertEqual(f.read(), b"abc")
1080
Antoine Pitrou27314942010-10-14 15:41:23 +00001081 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001083 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 # Write out many bytes from many threads and test they were
1085 # all flushed.
1086 N = 1000
1087 contents = bytes(range(256)) * N
1088 sizes = cycle([1, 19])
1089 n = 0
1090 queue = deque()
1091 while n < len(contents):
1092 size = next(sizes)
1093 queue.append(contents[n:n+size])
1094 n += size
1095 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001096 # We use a real file object because it allows us to
1097 # exercise situations where the GIL is released before
1098 # writing the buffer to the raw streams. This is in addition
1099 # to concurrency issues due to switching threads in the middle
1100 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001101 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001103 errors = []
1104 def f():
1105 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 while True:
1107 try:
1108 s = queue.popleft()
1109 except IndexError:
1110 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001111 bufio.write(s)
1112 except Exception as e:
1113 errors.append(e)
1114 raise
1115 threads = [threading.Thread(target=f) for x in range(20)]
1116 for t in threads:
1117 t.start()
1118 time.sleep(0.02) # yield
1119 for t in threads:
1120 t.join()
1121 self.assertFalse(errors,
1122 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001124 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 s = f.read()
1126 for i in range(256):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001127 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001128 finally:
1129 support.unlink(support.TESTFN)
1130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def test_misbehaved_io(self):
1132 rawio = self.MisbehavedRawIO()
1133 bufio = self.tp(rawio, 5)
1134 self.assertRaises(IOError, bufio.seek, 0)
1135 self.assertRaises(IOError, bufio.tell)
1136 self.assertRaises(IOError, bufio.write, b"abcdef")
1137
Benjamin Peterson59406a92009-03-26 17:10:29 +00001138 def test_max_buffer_size_deprecation(self):
1139 with support.check_warnings() as w:
1140 warnings.simplefilter("always", DeprecationWarning)
1141 self.tp(self.MockRawIO(), 8, 12)
1142 self.assertEqual(len(w.warnings), 1)
1143 warning = w.warnings[0]
1144 self.assertTrue(warning.category is DeprecationWarning)
1145 self.assertEqual(str(warning.message),
1146 "max_buffer_size is deprecated")
1147
1148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149class CBufferedWriterTest(BufferedWriterTest):
1150 tp = io.BufferedWriter
1151
1152 def test_constructor(self):
1153 BufferedWriterTest.test_constructor(self)
1154 # The allocation can succeed on 32-bit builds, e.g. with more
1155 # than 2GB RAM and a 64-bit kernel.
1156 if sys.maxsize > 0x7FFFFFFF:
1157 rawio = self.MockRawIO()
1158 bufio = self.tp(rawio)
1159 self.assertRaises((OverflowError, MemoryError, ValueError),
1160 bufio.__init__, rawio, sys.maxsize)
1161
1162 def test_initialization(self):
1163 rawio = self.MockRawIO()
1164 bufio = self.tp(rawio)
1165 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1166 self.assertRaises(ValueError, bufio.write, b"def")
1167 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1168 self.assertRaises(ValueError, bufio.write, b"def")
1169 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1170 self.assertRaises(ValueError, bufio.write, b"def")
1171
1172 def test_garbage_collection(self):
1173 # C BufferedWriter objects are collected, and collecting them flushes
1174 # all data to disk.
1175 # The Python version has __del__, so it ends into gc.garbage instead
1176 rawio = self.FileIO(support.TESTFN, "w+b")
1177 f = self.tp(rawio)
1178 f.write(b"123xxx")
1179 f.x = f
1180 wr = weakref.ref(f)
1181 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001182 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001183 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001184 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 self.assertEqual(f.read(), b"123xxx")
1186
1187
1188class PyBufferedWriterTest(BufferedWriterTest):
1189 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001190
Guido van Rossum01a27522007-03-07 01:00:12 +00001191class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001192
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001193 def test_constructor(self):
1194 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001195 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001196
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001197 def test_detach(self):
1198 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1199 self.assertRaises(self.UnsupportedOperation, pair.detach)
1200
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001201 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001202 with support.check_warnings() as w:
1203 warnings.simplefilter("always", DeprecationWarning)
1204 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1205 self.assertEqual(len(w.warnings), 1)
1206 warning = w.warnings[0]
1207 self.assertTrue(warning.category is DeprecationWarning)
1208 self.assertEqual(str(warning.message),
1209 "max_buffer_size is deprecated")
1210
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001211 def test_constructor_with_not_readable(self):
1212 class NotReadable(MockRawIO):
1213 def readable(self):
1214 return False
1215
1216 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1217
1218 def test_constructor_with_not_writeable(self):
1219 class NotWriteable(MockRawIO):
1220 def writable(self):
1221 return False
1222
1223 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1224
1225 def test_read(self):
1226 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1227
1228 self.assertEqual(pair.read(3), b"abc")
1229 self.assertEqual(pair.read(1), b"d")
1230 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001231 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1232 self.assertEqual(pair.read(None), b"abc")
1233
1234 def test_readlines(self):
1235 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1236 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1237 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1238 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001239
1240 def test_read1(self):
1241 # .read1() is delegated to the underlying reader object, so this test
1242 # can be shallow.
1243 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1244
1245 self.assertEqual(pair.read1(3), b"abc")
1246
1247 def test_readinto(self):
1248 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1249
1250 data = bytearray(5)
1251 self.assertEqual(pair.readinto(data), 5)
1252 self.assertEqual(data, b"abcde")
1253
1254 def test_write(self):
1255 w = self.MockRawIO()
1256 pair = self.tp(self.MockRawIO(), w)
1257
1258 pair.write(b"abc")
1259 pair.flush()
1260 pair.write(b"def")
1261 pair.flush()
1262 self.assertEqual(w._write_stack, [b"abc", b"def"])
1263
1264 def test_peek(self):
1265 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1266
1267 self.assertTrue(pair.peek(3).startswith(b"abc"))
1268 self.assertEqual(pair.read(3), b"abc")
1269
1270 def test_readable(self):
1271 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1272 self.assertTrue(pair.readable())
1273
1274 def test_writeable(self):
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276 self.assertTrue(pair.writable())
1277
1278 def test_seekable(self):
1279 # BufferedRWPairs are never seekable, even if their readers and writers
1280 # are.
1281 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1282 self.assertFalse(pair.seekable())
1283
1284 # .flush() is delegated to the underlying writer object and has been
1285 # tested in the test_write method.
1286
1287 def test_close_and_closed(self):
1288 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1289 self.assertFalse(pair.closed)
1290 pair.close()
1291 self.assertTrue(pair.closed)
1292
1293 def test_isatty(self):
1294 class SelectableIsAtty(MockRawIO):
1295 def __init__(self, isatty):
1296 MockRawIO.__init__(self)
1297 self._isatty = isatty
1298
1299 def isatty(self):
1300 return self._isatty
1301
1302 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1303 self.assertFalse(pair.isatty())
1304
1305 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1306 self.assertTrue(pair.isatty())
1307
1308 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1309 self.assertTrue(pair.isatty())
1310
1311 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1312 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001313
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314class CBufferedRWPairTest(BufferedRWPairTest):
1315 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317class PyBufferedRWPairTest(BufferedRWPairTest):
1318 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320
1321class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1322 read_mode = "rb+"
1323 write_mode = "wb+"
1324
1325 def test_constructor(self):
1326 BufferedReaderTest.test_constructor(self)
1327 BufferedWriterTest.test_constructor(self)
1328
1329 def test_read_and_write(self):
1330 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001331 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
1333 self.assertEqual(b"as", rw.read(2))
1334 rw.write(b"ddd")
1335 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001336 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001338 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 def test_seek_and_tell(self):
1341 raw = self.BytesIO(b"asdfghjkl")
1342 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001343
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001344 self.assertEqual(b"as", rw.read(2))
1345 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001346 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001347 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
1349 rw.write(b"asdf")
1350 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001351 self.assertEqual(b"asdfasdfl", rw.read())
1352 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001353 rw.seek(-4, 2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001354 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001355 rw.seek(2, 1)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001356 self.assertEqual(7, rw.tell())
1357 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001358 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 def check_flush_and_read(self, read_func):
1361 raw = self.BytesIO(b"abcdefghi")
1362 bufio = self.tp(raw)
1363
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001364 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 bufio.write(b"12")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001366 self.assertEqual(b"ef", read_func(bufio, 2))
1367 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001369 self.assertEqual(6, bufio.tell())
1370 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 raw.seek(0, 0)
1372 raw.write(b"XYZ")
1373 # flush() resets the read buffer
1374 bufio.flush()
1375 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001376 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377
1378 def test_flush_and_read(self):
1379 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1380
1381 def test_flush_and_readinto(self):
1382 def _readinto(bufio, n=-1):
1383 b = bytearray(n if n >= 0 else 9999)
1384 n = bufio.readinto(b)
1385 return bytes(b[:n])
1386 self.check_flush_and_read(_readinto)
1387
1388 def test_flush_and_peek(self):
1389 def _peek(bufio, n=-1):
1390 # This relies on the fact that the buffer can contain the whole
1391 # raw stream, otherwise peek() can return less.
1392 b = bufio.peek(n)
1393 if n != -1:
1394 b = b[:n]
1395 bufio.seek(len(b), 1)
1396 return b
1397 self.check_flush_and_read(_peek)
1398
1399 def test_flush_and_write(self):
1400 raw = self.BytesIO(b"abcdefghi")
1401 bufio = self.tp(raw)
1402
1403 bufio.write(b"123")
1404 bufio.flush()
1405 bufio.write(b"45")
1406 bufio.flush()
1407 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001408 self.assertEqual(b"12345fghi", raw.getvalue())
1409 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410
1411 def test_threads(self):
1412 BufferedReaderTest.test_threads(self)
1413 BufferedWriterTest.test_threads(self)
1414
1415 def test_writes_and_peek(self):
1416 def _peek(bufio):
1417 bufio.peek(1)
1418 self.check_writes(_peek)
1419 def _peek(bufio):
1420 pos = bufio.tell()
1421 bufio.seek(-1, 1)
1422 bufio.peek(1)
1423 bufio.seek(pos, 0)
1424 self.check_writes(_peek)
1425
1426 def test_writes_and_reads(self):
1427 def _read(bufio):
1428 bufio.seek(-1, 1)
1429 bufio.read(1)
1430 self.check_writes(_read)
1431
1432 def test_writes_and_read1s(self):
1433 def _read1(bufio):
1434 bufio.seek(-1, 1)
1435 bufio.read1(1)
1436 self.check_writes(_read1)
1437
1438 def test_writes_and_readintos(self):
1439 def _read(bufio):
1440 bufio.seek(-1, 1)
1441 bufio.readinto(bytearray(1))
1442 self.check_writes(_read)
1443
Antoine Pitrou0473e562009-08-06 20:52:43 +00001444 def test_write_after_readahead(self):
1445 # Issue #6629: writing after the buffer was filled by readahead should
1446 # first rewind the raw stream.
1447 for overwrite_size in [1, 5]:
1448 raw = self.BytesIO(b"A" * 10)
1449 bufio = self.tp(raw, 4)
1450 # Trigger readahead
1451 self.assertEqual(bufio.read(1), b"A")
1452 self.assertEqual(bufio.tell(), 1)
1453 # Overwriting should rewind the raw stream if it needs so
1454 bufio.write(b"B" * overwrite_size)
1455 self.assertEqual(bufio.tell(), overwrite_size + 1)
1456 # If the write size was smaller than the buffer size, flush() and
1457 # check that rewind happens.
1458 bufio.flush()
1459 self.assertEqual(bufio.tell(), overwrite_size + 1)
1460 s = raw.getvalue()
1461 self.assertEqual(s,
1462 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1463
Antoine Pitrou7c404892011-05-13 00:13:33 +02001464 def test_write_rewind_write(self):
1465 # Various combinations of reading / writing / seeking backwards / writing again
1466 def mutate(bufio, pos1, pos2):
1467 assert pos2 >= pos1
1468 # Fill the buffer
1469 bufio.seek(pos1)
1470 bufio.read(pos2 - pos1)
1471 bufio.write(b'\x02')
1472 # This writes earlier than the previous write, but still inside
1473 # the buffer.
1474 bufio.seek(pos1)
1475 bufio.write(b'\x01')
1476
1477 b = b"\x80\x81\x82\x83\x84"
1478 for i in range(0, len(b)):
1479 for j in range(i, len(b)):
1480 raw = self.BytesIO(b)
1481 bufio = self.tp(raw, 100)
1482 mutate(bufio, i, j)
1483 bufio.flush()
1484 expected = bytearray(b)
1485 expected[j] = 2
1486 expected[i] = 1
1487 self.assertEqual(raw.getvalue(), expected,
1488 "failed result for i=%d, j=%d" % (i, j))
1489
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001490 def test_truncate_after_read_or_write(self):
1491 raw = self.BytesIO(b"A" * 10)
1492 bufio = self.tp(raw, 100)
1493 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1494 self.assertEqual(bufio.truncate(), 2)
1495 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1496 self.assertEqual(bufio.truncate(), 4)
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498 def test_misbehaved_io(self):
1499 BufferedReaderTest.test_misbehaved_io(self)
1500 BufferedWriterTest.test_misbehaved_io(self)
1501
1502class CBufferedRandomTest(BufferedRandomTest):
1503 tp = io.BufferedRandom
1504
1505 def test_constructor(self):
1506 BufferedRandomTest.test_constructor(self)
1507 # The allocation can succeed on 32-bit builds, e.g. with more
1508 # than 2GB RAM and a 64-bit kernel.
1509 if sys.maxsize > 0x7FFFFFFF:
1510 rawio = self.MockRawIO()
1511 bufio = self.tp(rawio)
1512 self.assertRaises((OverflowError, MemoryError, ValueError),
1513 bufio.__init__, rawio, sys.maxsize)
1514
1515 def test_garbage_collection(self):
1516 CBufferedReaderTest.test_garbage_collection(self)
1517 CBufferedWriterTest.test_garbage_collection(self)
1518
1519class PyBufferedRandomTest(BufferedRandomTest):
1520 tp = pyio.BufferedRandom
1521
1522
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001523# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1524# properties:
1525# - A single output character can correspond to many bytes of input.
1526# - The number of input bytes to complete the character can be
1527# undetermined until the last input byte is received.
1528# - The number of input bytes can vary depending on previous input.
1529# - A single input byte can correspond to many characters of output.
1530# - The number of output characters can be undetermined until the
1531# last input byte is received.
1532# - The number of output characters can vary depending on previous input.
1533
1534class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1535 """
1536 For testing seek/tell behavior with a stateful, buffering decoder.
1537
1538 Input is a sequence of words. Words may be fixed-length (length set
1539 by input) or variable-length (period-terminated). In variable-length
1540 mode, extra periods are ignored. Possible words are:
1541 - 'i' followed by a number sets the input length, I (maximum 99).
1542 When I is set to 0, words are space-terminated.
1543 - 'o' followed by a number sets the output length, O (maximum 99).
1544 - Any other word is converted into a word followed by a period on
1545 the output. The output word consists of the input word truncated
1546 or padded out with hyphens to make its length equal to O. If O
1547 is 0, the word is output verbatim without truncating or padding.
1548 I and O are initially set to 1. When I changes, any buffered input is
1549 re-scanned according to the new I. EOF also terminates the last word.
1550 """
1551
1552 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001553 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001554 self.reset()
1555
1556 def __repr__(self):
1557 return '<SID %x>' % id(self)
1558
1559 def reset(self):
1560 self.i = 1
1561 self.o = 1
1562 self.buffer = bytearray()
1563
1564 def getstate(self):
1565 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1566 return bytes(self.buffer), i*100 + o
1567
1568 def setstate(self, state):
1569 buffer, io = state
1570 self.buffer = bytearray(buffer)
1571 i, o = divmod(io, 100)
1572 self.i, self.o = i ^ 1, o ^ 1
1573
1574 def decode(self, input, final=False):
1575 output = ''
1576 for b in input:
1577 if self.i == 0: # variable-length, terminated with period
1578 if b == ord('.'):
1579 if self.buffer:
1580 output += self.process_word()
1581 else:
1582 self.buffer.append(b)
1583 else: # fixed-length, terminate after self.i bytes
1584 self.buffer.append(b)
1585 if len(self.buffer) == self.i:
1586 output += self.process_word()
1587 if final and self.buffer: # EOF terminates the last word
1588 output += self.process_word()
1589 return output
1590
1591 def process_word(self):
1592 output = ''
1593 if self.buffer[0] == ord('i'):
1594 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1595 elif self.buffer[0] == ord('o'):
1596 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1597 else:
1598 output = self.buffer.decode('ascii')
1599 if len(output) < self.o:
1600 output += '-'*self.o # pad out with hyphens
1601 if self.o:
1602 output = output[:self.o] # truncate to output length
1603 output += '.'
1604 self.buffer = bytearray()
1605 return output
1606
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001607 codecEnabled = False
1608
1609 @classmethod
1610 def lookupTestDecoder(cls, name):
1611 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001612 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001613 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001614 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001615 incrementalencoder=None,
1616 streamreader=None, streamwriter=None,
1617 incrementaldecoder=cls)
1618
1619# Register the previous decoder for testing.
1620# Disabled by default, tests will enable it.
1621codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1622
1623
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001624class StatefulIncrementalDecoderTest(unittest.TestCase):
1625 """
1626 Make sure the StatefulIncrementalDecoder actually works.
1627 """
1628
1629 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001630 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001631 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001632 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001633 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001634 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001635 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001636 # I=0, O=6 (variable-length input, fixed-length output)
1637 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1638 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001639 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001640 # I=6, O=3 (fixed-length input > fixed-length output)
1641 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1642 # I=0, then 3; O=29, then 15 (with longer output)
1643 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1644 'a----------------------------.' +
1645 'b----------------------------.' +
1646 'cde--------------------------.' +
1647 'abcdefghijabcde.' +
1648 'a.b------------.' +
1649 '.c.------------.' +
1650 'd.e------------.' +
1651 'k--------------.' +
1652 'l--------------.' +
1653 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001654 ]
1655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001657 # Try a few one-shot test cases.
1658 for input, eof, output in self.test_cases:
1659 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001660 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001661
1662 # Also test an unfinished decode, followed by forcing EOF.
1663 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001664 self.assertEqual(d.decode(b'oiabcd'), '')
1665 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001666
1667class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001668
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001669 def setUp(self):
1670 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1671 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001672 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001673
Guido van Rossumd0712812007-04-11 16:32:43 +00001674 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001675 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 def test_constructor(self):
1678 r = self.BytesIO(b"\xc3\xa9\n\n")
1679 b = self.BufferedReader(r, 1000)
1680 t = self.TextIOWrapper(b)
1681 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001682 self.assertEqual(t.encoding, "latin1")
1683 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001684 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001685 self.assertEqual(t.encoding, "utf8")
1686 self.assertEqual(t.line_buffering, True)
1687 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 self.assertRaises(TypeError, t.__init__, b, newline=42)
1689 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1690
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001691 def test_detach(self):
1692 r = self.BytesIO()
1693 b = self.BufferedWriter(r)
1694 t = self.TextIOWrapper(b)
1695 self.assertIs(t.detach(), b)
1696
1697 t = self.TextIOWrapper(b, encoding="ascii")
1698 t.write("howdy")
1699 self.assertFalse(r.getvalue())
1700 t.detach()
1701 self.assertEqual(r.getvalue(), b"howdy")
1702 self.assertRaises(ValueError, t.detach)
1703
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001704 def test_repr(self):
1705 raw = self.BytesIO("hello".encode("utf-8"))
1706 b = self.BufferedReader(raw)
1707 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001708 modname = self.TextIOWrapper.__module__
1709 self.assertEqual(repr(t),
1710 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1711 raw.name = "dummy"
1712 self.assertEqual(repr(t),
1713 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1714 raw.name = b"dummy"
1715 self.assertEqual(repr(t),
1716 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 def test_line_buffering(self):
1719 r = self.BytesIO()
1720 b = self.BufferedWriter(r, 1000)
1721 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001722 t.write("X")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001723 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001724 t.write("Y\nZ")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001725 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001726 t.write("A\rB")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001727 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 def test_encoding(self):
1730 # Check the encoding attribute is always set, and valid
1731 b = self.BytesIO()
1732 t = self.TextIOWrapper(b, encoding="utf8")
1733 self.assertEqual(t.encoding, "utf8")
1734 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001735 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 codecs.lookup(t.encoding)
1737
1738 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001739 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 b = self.BytesIO(b"abc\n\xff\n")
1741 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001742 self.assertRaises(UnicodeError, t.read)
1743 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 b = self.BytesIO(b"abc\n\xff\n")
1745 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001746 self.assertRaises(UnicodeError, t.read)
1747 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 b = self.BytesIO(b"abc\n\xff\n")
1749 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001750 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001751 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001752 b = self.BytesIO(b"abc\n\xff\n")
1753 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001754 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001756 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001757 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 b = self.BytesIO()
1759 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001760 self.assertRaises(UnicodeError, t.write, "\xff")
1761 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 b = self.BytesIO()
1763 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001764 self.assertRaises(UnicodeError, t.write, "\xff")
1765 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 b = self.BytesIO()
1767 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001768 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001769 t.write("abc\xffdef\n")
1770 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001771 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001772 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 b = self.BytesIO()
1774 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001775 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001776 t.write("abc\xffdef\n")
1777 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001778 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001781 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1782
1783 tests = [
1784 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001785 [ '', input_lines ],
1786 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1787 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1788 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001789 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001790 encodings = (
1791 'utf-8', 'latin-1',
1792 'utf-16', 'utf-16-le', 'utf-16-be',
1793 'utf-32', 'utf-32-le', 'utf-32-be',
1794 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001795
Guido van Rossum8358db22007-08-18 21:39:55 +00001796 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001797 # character in TextIOWrapper._pending_line.
1798 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001799 # XXX: str.encode() should return bytes
1800 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001801 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001802 for bufsize in range(1, 10):
1803 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1805 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001806 encoding=encoding)
1807 if do_reads:
1808 got_lines = []
1809 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001810 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001811 if c2 == '':
1812 break
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001813 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001814 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001815 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001816 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001817
1818 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001819 self.assertEqual(got_line, exp_line)
1820 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 def test_newlines_input(self):
1823 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001824 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1825 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001826 (None, normalized.decode("ascii").splitlines(True)),
1827 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1829 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1830 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001831 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 buf = self.BytesIO(testdata)
1833 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001834 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001835 txt.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001836 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838 def test_newlines_output(self):
1839 testdict = {
1840 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1841 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1842 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1843 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1844 }
1845 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1846 for newline, expected in tests:
1847 buf = self.BytesIO()
1848 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1849 txt.write("AAA\nB")
1850 txt.write("BB\nCCC\n")
1851 txt.write("X\rY\r\nZ")
1852 txt.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001853 self.assertEqual(buf.closed, False)
1854 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001855
1856 def test_destructor(self):
1857 l = []
1858 base = self.BytesIO
1859 class MyBytesIO(base):
1860 def close(self):
1861 l.append(self.getvalue())
1862 base.close(self)
1863 b = MyBytesIO()
1864 t = self.TextIOWrapper(b, encoding="ascii")
1865 t.write("abc")
1866 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001867 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001868 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869
1870 def test_override_destructor(self):
1871 record = []
1872 class MyTextIO(self.TextIOWrapper):
1873 def __del__(self):
1874 record.append(1)
1875 try:
1876 f = super().__del__
1877 except AttributeError:
1878 pass
1879 else:
1880 f()
1881 def close(self):
1882 record.append(2)
1883 super().close()
1884 def flush(self):
1885 record.append(3)
1886 super().flush()
1887 b = self.BytesIO()
1888 t = MyTextIO(b, encoding="ascii")
1889 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001890 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891 self.assertEqual(record, [1, 2, 3])
1892
1893 def test_error_through_destructor(self):
1894 # Test that the exception state is not modified by a destructor,
1895 # even if close() fails.
1896 rawio = self.CloseFailureIO()
1897 def f():
1898 self.TextIOWrapper(rawio).xyzzy
1899 with support.captured_output("stderr") as s:
1900 self.assertRaises(AttributeError, f)
1901 s = s.getvalue().strip()
1902 if s:
1903 # The destructor *may* have printed an unraisable error, check it
1904 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001905 self.assertTrue(s.startswith("Exception IOError: "), s)
1906 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001907
Guido van Rossum9b76da62007-04-11 01:09:03 +00001908 # Systematic tests of the text I/O API
1909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001911 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1912 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001915 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001916 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001918 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001919 self.assertEqual(f.tell(), 0)
1920 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001921 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001922 self.assertEqual(f.seek(0), 0)
1923 self.assertEqual(f.read(None), "abc")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001924 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001925 self.assertEqual(f.read(2), "ab")
1926 self.assertEqual(f.read(1), "c")
1927 self.assertEqual(f.read(1), "")
1928 self.assertEqual(f.read(), "")
1929 self.assertEqual(f.tell(), cookie)
1930 self.assertEqual(f.seek(0), 0)
1931 self.assertEqual(f.seek(0, 2), cookie)
1932 self.assertEqual(f.write("def"), 3)
1933 self.assertEqual(f.seek(cookie), cookie)
1934 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001935 if enc.startswith("utf"):
1936 self.multi_line_test(f, enc)
1937 f.close()
1938
1939 def multi_line_test(self, f, enc):
1940 f.seek(0)
1941 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001942 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001943 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001944 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001945 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001946 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001947 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001948 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001949 wlines.append((f.tell(), line))
1950 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001951 f.seek(0)
1952 rlines = []
1953 while True:
1954 pos = f.tell()
1955 line = f.readline()
1956 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001957 break
1958 rlines.append((pos, line))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001959 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 def test_telling(self):
1962 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001963 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001964 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001965 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001966 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001967 p2 = f.tell()
1968 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001969 self.assertEqual(f.tell(), p0)
1970 self.assertEqual(f.readline(), "\xff\n")
1971 self.assertEqual(f.tell(), p1)
1972 self.assertEqual(f.readline(), "\xff\n")
1973 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001974 f.seek(0)
1975 for line in f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001976 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001977 self.assertRaises(IOError, f.tell)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001978 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001979 f.close()
1980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981 def test_seeking(self):
1982 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001983 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001984 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001985 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001986 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001987 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001988 suffix = bytes(u_suffix.encode("utf-8"))
1989 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001991 f.write(line*2)
1992 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001994 s = f.read(prefix_size)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001995 self.assertEqual(s, str(prefix, "ascii"))
1996 self.assertEqual(f.tell(), prefix_size)
1997 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00001998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002000 # Regression test for a specific bug
2001 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002003 f.write(data)
2004 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002006 f._CHUNK_SIZE # Just test that it exists
2007 f._CHUNK_SIZE = 2
2008 f.readline()
2009 f.tell()
2010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 def test_seek_and_tell(self):
2012 #Test seek/tell using the StatefulIncrementalDecoder.
2013 # Make test faster by doing smaller seeks
2014 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002015
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002016 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002017 """Tell/seek to various points within a data stream and ensure
2018 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002020 f.write(data)
2021 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 f = self.open(support.TESTFN, encoding='test_decoder')
2023 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002024 decoded = f.read()
2025 f.close()
2026
Neal Norwitze2b07052008-03-18 19:52:05 +00002027 for i in range(min_pos, len(decoded) + 1): # seek positions
2028 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002030 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002031 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002032 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002033 f.seek(cookie)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002034 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002035 f.close()
2036
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002037 # Enable the test decoder.
2038 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002039
2040 # Run the tests.
2041 try:
2042 # Try each test case.
2043 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002044 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002045
2046 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002047 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2048 offset = CHUNK_SIZE - len(input)//2
2049 prefix = b'.'*offset
2050 # Don't bother seeking into the prefix (takes too long).
2051 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002052 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053
2054 # Ensure our test decoder won't interfere with subsequent tests.
2055 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002056 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002059 data = "1234567890"
2060 tests = ("utf-16",
2061 "utf-16-le",
2062 "utf-16-be",
2063 "utf-32",
2064 "utf-32-le",
2065 "utf-32-be")
2066 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 buf = self.BytesIO()
2068 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002069 # Check if the BOM is written only once (see issue1753).
2070 f.write(data)
2071 f.write(data)
2072 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002073 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002074 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002075 self.assertEqual(f.read(), data * 2)
2076 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002077
Benjamin Petersona1b49012009-03-31 23:11:32 +00002078 def test_unreadable(self):
2079 class UnReadable(self.BytesIO):
2080 def readable(self):
2081 return False
2082 txt = self.TextIOWrapper(UnReadable())
2083 self.assertRaises(IOError, txt.read)
2084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 def test_read_one_by_one(self):
2086 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002087 reads = ""
2088 while True:
2089 c = txt.read(1)
2090 if not c:
2091 break
2092 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002093 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002094
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002095 def test_readlines(self):
2096 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2097 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2098 txt.seek(0)
2099 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2100 txt.seek(0)
2101 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2102
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002103 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002104 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002105 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002106 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002107 reads = ""
2108 while True:
2109 c = txt.read(128)
2110 if not c:
2111 break
2112 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002113 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002114
2115 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002117
2118 # read one char at a time
2119 reads = ""
2120 while True:
2121 c = txt.read(1)
2122 if not c:
2123 break
2124 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002125 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002126
2127 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002129 txt._CHUNK_SIZE = 4
2130
2131 reads = ""
2132 while True:
2133 c = txt.read(4)
2134 if not c:
2135 break
2136 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002137 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002138
2139 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002141 txt._CHUNK_SIZE = 4
2142
2143 reads = txt.read(4)
2144 reads += txt.read(4)
2145 reads += txt.readline()
2146 reads += txt.readline()
2147 reads += txt.readline()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002148 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002149
2150 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002152 txt._CHUNK_SIZE = 4
2153
2154 reads = txt.read(4)
2155 reads += txt.read()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002156 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002157
2158 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002160 txt._CHUNK_SIZE = 4
2161
2162 reads = txt.read(4)
2163 pos = txt.tell()
2164 txt.seek(0)
2165 txt.seek(pos)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002166 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002167
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002168 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 buffer = self.BytesIO(self.testdata)
2170 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002171
2172 self.assertEqual(buffer.seekable(), txt.seekable())
2173
Antoine Pitroue4501852009-05-14 18:55:55 +00002174 def test_append_bom(self):
2175 # The BOM is not written again when appending to a non-empty file
2176 filename = support.TESTFN
2177 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2178 with self.open(filename, 'w', encoding=charset) as f:
2179 f.write('aaa')
2180 pos = f.tell()
2181 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002182 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002183
2184 with self.open(filename, 'a', encoding=charset) as f:
2185 f.write('xxx')
2186 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002187 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002188
2189 def test_seek_bom(self):
2190 # Same test, but when seeking manually
2191 filename = support.TESTFN
2192 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2193 with self.open(filename, 'w', encoding=charset) as f:
2194 f.write('aaa')
2195 pos = f.tell()
2196 with self.open(filename, 'r+', encoding=charset) as f:
2197 f.seek(pos)
2198 f.write('zzz')
2199 f.seek(0)
2200 f.write('bbb')
2201 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002202 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002203
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002204 def test_errors_property(self):
2205 with self.open(support.TESTFN, "w") as f:
2206 self.assertEqual(f.errors, "strict")
2207 with self.open(support.TESTFN, "w", errors="replace") as f:
2208 self.assertEqual(f.errors, "replace")
2209
Antoine Pitroue4501852009-05-14 18:55:55 +00002210
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002211 def test_threads_write(self):
2212 # Issue6750: concurrent writes could duplicate data
2213 event = threading.Event()
2214 with self.open(support.TESTFN, "w", buffering=1) as f:
2215 def run(n):
2216 text = "Thread%03d\n" % n
2217 event.wait()
2218 f.write(text)
2219 threads = [threading.Thread(target=lambda n=x: run(n))
2220 for x in range(20)]
2221 for t in threads:
2222 t.start()
2223 time.sleep(0.02)
2224 event.set()
2225 for t in threads:
2226 t.join()
2227 with self.open(support.TESTFN) as f:
2228 content = f.read()
2229 for n in range(20):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002230 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002231
Antoine Pitroufaf90072010-05-03 16:58:19 +00002232 def test_flush_error_on_close(self):
2233 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2234 def bad_flush():
2235 raise IOError()
2236 txt.flush = bad_flush
2237 self.assertRaises(IOError, txt.close) # exception not swallowed
2238
2239 def test_multi_close(self):
2240 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2241 txt.close()
2242 txt.close()
2243 txt.close()
2244 self.assertRaises(ValueError, txt.flush)
2245
Antoine Pitrou6cfc5122010-12-21 21:26:09 +00002246 def test_readonly_attributes(self):
2247 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2248 buf = self.BytesIO(self.testdata)
2249 with self.assertRaises(AttributeError):
2250 txt.buffer = buf
2251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252class CTextIOWrapperTest(TextIOWrapperTest):
2253
2254 def test_initialization(self):
2255 r = self.BytesIO(b"\xc3\xa9\n\n")
2256 b = self.BufferedReader(r, 1000)
2257 t = self.TextIOWrapper(b)
2258 self.assertRaises(TypeError, t.__init__, b, newline=42)
2259 self.assertRaises(ValueError, t.read)
2260 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2261 self.assertRaises(ValueError, t.read)
2262
2263 def test_garbage_collection(self):
2264 # C TextIOWrapper objects are collected, and collecting them flushes
2265 # all data to disk.
2266 # The Python version has __del__, so it ends in gc.garbage instead.
2267 rawio = io.FileIO(support.TESTFN, "wb")
2268 b = self.BufferedWriter(rawio)
2269 t = self.TextIOWrapper(b, encoding="ascii")
2270 t.write("456def")
2271 t.x = t
2272 wr = weakref.ref(t)
2273 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002274 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002275 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002276 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 self.assertEqual(f.read(), b"456def")
2278
2279class PyTextIOWrapperTest(TextIOWrapperTest):
2280 pass
2281
2282
2283class IncrementalNewlineDecoderTest(unittest.TestCase):
2284
2285 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002286 # UTF-8 specific tests for a newline decoder
2287 def _check_decode(b, s, **kwargs):
2288 # We exercise getstate() / setstate() as well as decode()
2289 state = decoder.getstate()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002290 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002291 decoder.setstate(state)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002292 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002293
Antoine Pitrou180a3362008-12-14 16:36:46 +00002294 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002295
Antoine Pitrou180a3362008-12-14 16:36:46 +00002296 _check_decode(b'\xe8', "")
2297 _check_decode(b'\xa2', "")
2298 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002299
Antoine Pitrou180a3362008-12-14 16:36:46 +00002300 _check_decode(b'\xe8', "")
2301 _check_decode(b'\xa2', "")
2302 _check_decode(b'\x88', "\u8888")
2303
2304 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002305 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2306
Antoine Pitrou180a3362008-12-14 16:36:46 +00002307 decoder.reset()
2308 _check_decode(b'\n', "\n")
2309 _check_decode(b'\r', "")
2310 _check_decode(b'', "\n", final=True)
2311 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002312
Antoine Pitrou180a3362008-12-14 16:36:46 +00002313 _check_decode(b'\r', "")
2314 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002315
Antoine Pitrou180a3362008-12-14 16:36:46 +00002316 _check_decode(b'\r\r\n', "\n\n")
2317 _check_decode(b'\r', "")
2318 _check_decode(b'\r', "\n")
2319 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002320
Antoine Pitrou180a3362008-12-14 16:36:46 +00002321 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2322 _check_decode(b'\xe8\xa2\x88', "\u8888")
2323 _check_decode(b'\n', "\n")
2324 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2325 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002328 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002329 if encoding is not None:
2330 encoder = codecs.getincrementalencoder(encoding)()
2331 def _decode_bytewise(s):
2332 # Decode one byte at a time
2333 for b in encoder.encode(s):
2334 result.append(decoder.decode(bytes([b])))
2335 else:
2336 encoder = None
2337 def _decode_bytewise(s):
2338 # Decode one char at a time
2339 for c in s:
2340 result.append(decoder.decode(c))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002341 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002342 _decode_bytewise("abc\n\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002343 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002344 _decode_bytewise("\nabc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002345 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002346 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002347 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002348 _decode_bytewise("abc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002349 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002350 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002351 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002352 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002353 input = "abc"
2354 if encoder is not None:
2355 encoder.reset()
2356 input = encoder.encode(input)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002357 self.assertEqual(decoder.decode(input), "abc")
2358 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002359
2360 def test_newline_decoder(self):
2361 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002362 # None meaning the IncrementalNewlineDecoder takes unicode input
2363 # rather than bytes input
2364 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002365 'utf-16', 'utf-16-le', 'utf-16-be',
2366 'utf-32', 'utf-32-le', 'utf-32-be',
2367 )
2368 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 decoder = enc and codecs.getincrementaldecoder(enc)()
2370 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2371 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002372 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002373 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2374 self.check_newline_decoding_utf8(decoder)
2375
Antoine Pitrou66913e22009-03-06 23:40:56 +00002376 def test_newline_bytes(self):
2377 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2378 def _check(dec):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002379 self.assertEqual(dec.newlines, None)
2380 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2381 self.assertEqual(dec.newlines, None)
2382 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2383 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002384 dec = self.IncrementalNewlineDecoder(None, translate=False)
2385 _check(dec)
2386 dec = self.IncrementalNewlineDecoder(None, translate=True)
2387 _check(dec)
2388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2390 pass
2391
2392class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2393 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002394
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002395
Guido van Rossum01a27522007-03-07 01:00:12 +00002396# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002397
Guido van Rossum5abbf752007-08-27 17:39:33 +00002398class MiscIOTest(unittest.TestCase):
2399
Barry Warsaw40e82462008-11-20 20:14:50 +00002400 def tearDown(self):
2401 support.unlink(support.TESTFN)
2402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002403 def test___all__(self):
2404 for name in self.io.__all__:
2405 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002406 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002407 if name == "open":
2408 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002409 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002410 self.assertTrue(issubclass(obj, Exception), name)
2411 elif not name.startswith("SEEK_"):
2412 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002413
Barry Warsaw40e82462008-11-20 20:14:50 +00002414 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002416 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002417 f.close()
2418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 f = self.open(support.TESTFN, "U")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002420 self.assertEqual(f.name, support.TESTFN)
2421 self.assertEqual(f.buffer.name, support.TESTFN)
2422 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2423 self.assertEqual(f.mode, "U")
2424 self.assertEqual(f.buffer.mode, "rb")
2425 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002426 f.close()
2427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 f = self.open(support.TESTFN, "w+")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002429 self.assertEqual(f.mode, "w+")
2430 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2431 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002434 self.assertEqual(g.mode, "wb")
2435 self.assertEqual(g.raw.mode, "wb")
2436 self.assertEqual(g.name, f.fileno())
2437 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002438 f.close()
2439 g.close()
2440
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002441 def test_io_after_close(self):
2442 for kwargs in [
2443 {"mode": "w"},
2444 {"mode": "wb"},
2445 {"mode": "w", "buffering": 1},
2446 {"mode": "w", "buffering": 2},
2447 {"mode": "wb", "buffering": 0},
2448 {"mode": "r"},
2449 {"mode": "rb"},
2450 {"mode": "r", "buffering": 1},
2451 {"mode": "r", "buffering": 2},
2452 {"mode": "rb", "buffering": 0},
2453 {"mode": "w+"},
2454 {"mode": "w+b"},
2455 {"mode": "w+", "buffering": 1},
2456 {"mode": "w+", "buffering": 2},
2457 {"mode": "w+b", "buffering": 0},
2458 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002459 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002460 f.close()
2461 self.assertRaises(ValueError, f.flush)
2462 self.assertRaises(ValueError, f.fileno)
2463 self.assertRaises(ValueError, f.isatty)
2464 self.assertRaises(ValueError, f.__iter__)
2465 if hasattr(f, "peek"):
2466 self.assertRaises(ValueError, f.peek, 1)
2467 self.assertRaises(ValueError, f.read)
2468 if hasattr(f, "read1"):
2469 self.assertRaises(ValueError, f.read1, 1024)
2470 if hasattr(f, "readinto"):
2471 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2472 self.assertRaises(ValueError, f.readline)
2473 self.assertRaises(ValueError, f.readlines)
2474 self.assertRaises(ValueError, f.seek, 0)
2475 self.assertRaises(ValueError, f.tell)
2476 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 self.assertRaises(ValueError, f.write,
2478 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002479 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002480 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002481
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 def test_blockingioerror(self):
2483 # Various BlockingIOError issues
2484 self.assertRaises(TypeError, self.BlockingIOError)
2485 self.assertRaises(TypeError, self.BlockingIOError, 1)
2486 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2487 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2488 b = self.BlockingIOError(1, "")
2489 self.assertEqual(b.characters_written, 0)
2490 class C(str):
2491 pass
2492 c = C("")
2493 b = self.BlockingIOError(1, c)
2494 c.b = b
2495 b.c = c
2496 wr = weakref.ref(c)
2497 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002498 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002499 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500
2501 def test_abcs(self):
2502 # Test the visible base classes are ABCs.
2503 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2504 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2505 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2506 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2507
2508 def _check_abc_inheritance(self, abcmodule):
2509 with self.open(support.TESTFN, "wb", buffering=0) as f:
2510 self.assertTrue(isinstance(f, abcmodule.IOBase))
2511 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2512 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2513 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2514 with self.open(support.TESTFN, "wb") as f:
2515 self.assertTrue(isinstance(f, abcmodule.IOBase))
2516 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2517 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2518 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2519 with self.open(support.TESTFN, "w") as f:
2520 self.assertTrue(isinstance(f, abcmodule.IOBase))
2521 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2522 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2523 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2524
2525 def test_abc_inheritance(self):
2526 # Test implementations inherit from their respective ABCs
2527 self._check_abc_inheritance(self)
2528
2529 def test_abc_inheritance_official(self):
2530 # Test implementations inherit from the official ABCs of the
2531 # baseline "io" module.
2532 self._check_abc_inheritance(io)
2533
2534class CMiscIOTest(MiscIOTest):
2535 io = io
2536
2537class PyMiscIOTest(MiscIOTest):
2538 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002539
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002540
2541@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2542class SignalsTest(unittest.TestCase):
2543
2544 def setUp(self):
2545 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2546
2547 def tearDown(self):
2548 signal.signal(signal.SIGALRM, self.oldalrm)
2549
2550 def alarm_interrupt(self, sig, frame):
2551 1/0
2552
2553 @unittest.skipUnless(threading, 'Threading required for this test.')
2554 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2555 """Check that a partial write, when it gets interrupted, properly
2556 invokes the signal handler."""
2557 read_results = []
2558 def _read():
2559 s = os.read(r, 1)
2560 read_results.append(s)
2561 t = threading.Thread(target=_read)
2562 t.daemon = True
2563 r, w = os.pipe()
2564 try:
2565 wio = self.io.open(w, **fdopen_kwargs)
2566 t.start()
2567 signal.alarm(1)
2568 # Fill the pipe enough that the write will be blocking.
2569 # It will be interrupted by the timer armed above. Since the
2570 # other thread has read one byte, the low-level write will
2571 # return with a successful (partial) result rather than an EINTR.
2572 # The buffered IO layer must check for pending signal
2573 # handlers, which in this case will invoke alarm_interrupt().
2574 self.assertRaises(ZeroDivisionError,
2575 wio.write, item * (1024 * 1024))
2576 t.join()
2577 # We got one byte, get another one and check that it isn't a
2578 # repeat of the first one.
2579 read_results.append(os.read(r, 1))
2580 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2581 finally:
2582 os.close(w)
2583 os.close(r)
2584 # This is deliberate. If we didn't close the file descriptor
2585 # before closing wio, wio would try to flush its internal
2586 # buffer, and block again.
2587 try:
2588 wio.close()
2589 except IOError as e:
2590 if e.errno != errno.EBADF:
2591 raise
2592
2593 def test_interrupted_write_unbuffered(self):
2594 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2595
2596 def test_interrupted_write_buffered(self):
2597 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2598
2599 def test_interrupted_write_text(self):
2600 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2601
Antoine Pitrou976157f2010-12-03 19:21:49 +00002602 def check_reentrant_write(self, data, **fdopen_kwargs):
2603 def on_alarm(*args):
2604 # Will be called reentrantly from the same thread
2605 wio.write(data)
2606 1/0
2607 signal.signal(signal.SIGALRM, on_alarm)
2608 r, w = os.pipe()
2609 wio = self.io.open(w, **fdopen_kwargs)
2610 try:
2611 signal.alarm(1)
2612 # Either the reentrant call to wio.write() fails with RuntimeError,
2613 # or the signal handler raises ZeroDivisionError.
2614 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2615 while 1:
2616 for i in range(100):
2617 wio.write(data)
2618 wio.flush()
2619 # Make sure the buffer doesn't fill up and block further writes
2620 os.read(r, len(data) * 100)
2621 finally:
2622 wio.close()
2623 os.close(r)
2624
2625 def test_reentrant_write_buffered(self):
2626 self.check_reentrant_write(b"xy", mode="wb")
2627
2628 def test_reentrant_write_text(self):
2629 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2630
2631
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002632class CSignalsTest(SignalsTest):
2633 io = io
2634
2635class PySignalsTest(SignalsTest):
2636 io = pyio
2637
Antoine Pitrou976157f2010-12-03 19:21:49 +00002638 # Handling reentrancy issues would slow down _pyio even more, so the
2639 # tests are disabled.
2640 test_reentrant_write_buffered = None
2641 test_reentrant_write_text = None
2642
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002643
Guido van Rossum28524c72007-02-27 05:47:44 +00002644def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 tests = (CIOTest, PyIOTest,
2646 CBufferedReaderTest, PyBufferedReaderTest,
2647 CBufferedWriterTest, PyBufferedWriterTest,
2648 CBufferedRWPairTest, PyBufferedRWPairTest,
2649 CBufferedRandomTest, PyBufferedRandomTest,
2650 StatefulIncrementalDecoderTest,
2651 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2652 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002653 CMiscIOTest, PyMiscIOTest,
2654 CSignalsTest, PySignalsTest,
2655 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656
2657 # Put the namespaces of the IO module we are testing and some useful mock
2658 # classes in the __dict__ of each test.
2659 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitroue5e75c62010-09-14 18:53:07 +00002660 MockNonBlockWriterIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2662 c_io_ns = {name : getattr(io, name) for name in all_members}
2663 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2664 globs = globals()
2665 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2666 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2667 # Avoid turning open into a bound method.
2668 py_io_ns["open"] = pyio.OpenWrapper
2669 for test in tests:
2670 if test.__name__.startswith("C"):
2671 for name, obj in c_io_ns.items():
2672 setattr(test, name, obj)
2673 elif test.__name__.startswith("Py"):
2674 for name, obj in py_io_ns.items():
2675 setattr(test, name, obj)
2676
2677 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002678
2679if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 test_main()