blob: acf90df0a8dbbc362fd9ed342f1cc321dd930daf [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
Antoine Pitroue5e75c62010-09-14 18:53:07 +000050class MockRawIOWithoutRead:
51 """A RawIO implementation without read(), so as to exercise the default
52 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000053
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000054 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000058 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000088 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000106class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
107 pass
108
109class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
110 pass
111
112
113class MockRawIO(MockRawIOWithoutRead):
114
115 def read(self, n=None):
116 self._reads += 1
117 try:
118 return self._read_stack.pop(0)
119 except:
120 self._extraneous_reads += 1
121 return b""
122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123class CMockRawIO(MockRawIO, io.RawIOBase):
124 pass
125
126class PyMockRawIO(MockRawIO, pyio.RawIOBase):
127 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000128
Guido van Rossuma9e20242007-03-08 00:43:48 +0000129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class MisbehavedRawIO(MockRawIO):
131 def write(self, b):
132 return super().write(b) * 2
133
134 def read(self, n=None):
135 return super().read(n) * 2
136
137 def seek(self, pos, whence):
138 return -123
139
140 def tell(self):
141 return -456
142
143 def readinto(self, buf):
144 super().readinto(buf)
145 return len(buf) * 5
146
147class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
148 pass
149
150class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
151 pass
152
153
154class CloseFailureIO(MockRawIO):
155 closed = 0
156
157 def close(self):
158 if not self.closed:
159 self.closed = 1
160 raise IOError
161
162class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
163 pass
164
165class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
166 pass
167
168
169class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000170
171 def __init__(self, data):
172 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177 self.read_history.append(None if res is None else len(res))
178 return res
179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def readinto(self, b):
181 res = super().readinto(b)
182 self.read_history.append(res)
183 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185class CMockFileIO(MockFileIO, io.BytesIO):
186 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class PyMockFileIO(MockFileIO, pyio.BytesIO):
189 pass
190
191
192class MockNonBlockWriterIO:
193
194 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def pop_written(self):
199 s = b"".join(self._write_stack)
200 self._write_stack[:] = []
201 return s
202
203 def block_on(self, char):
204 """Block when a given char is encountered."""
205 self._blocker_char = char
206
207 def readable(self):
208 return True
209
210 def seekable(self):
211 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000212
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 def writable(self):
214 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def write(self, b):
217 b = bytes(b)
218 n = -1
219 if self._blocker_char:
220 try:
221 n = b.index(self._blocker_char)
222 except ValueError:
223 pass
224 else:
225 self._blocker_char = None
226 self._write_stack.append(b[:n])
227 raise self.BlockingIOError(0, "test blocking", n)
228 self._write_stack.append(b)
229 return len(b)
230
231class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
232 BlockingIOError = io.BlockingIOError
233
234class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
235 BlockingIOError = pyio.BlockingIOError
236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum28524c72007-02-27 05:47:44 +0000238class IOTest(unittest.TestCase):
239
Neal Norwitze7789b12008-03-24 06:18:09 +0000240 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000241 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000242
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000243 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000244 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000245
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000248 f.truncate(0)
249 self.assertEqual(f.tell(), 5)
250 f.seek(0)
251
252 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.seek(0), 0)
254 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000259 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000260 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000261 self.assertEqual(f.seek(-1, 2), 13)
262 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000263
Guido van Rossum87429772007-04-10 21:06:59 +0000264 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000265 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000267
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 def read_ops(self, f, buffered=False):
269 data = f.read(5)
270 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.readinto(data), 5)
273 self.assertEqual(data, b" worl")
274 self.assertEqual(f.readinto(data), 2)
275 self.assertEqual(len(data), 5)
276 self.assertEqual(data[:2], b"d\n")
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.read(20), b"hello world\n")
279 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000280 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000281 self.assertEqual(f.seek(-6, 2), 6)
282 self.assertEqual(f.read(5), b"world")
283 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000285 self.assertEqual(f.seek(-6, 1), 5)
286 self.assertEqual(f.read(5), b" worl")
287 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000288 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000289 if buffered:
290 f.seek(0)
291 self.assertEqual(f.read(), b"hello world\n")
292 f.seek(6)
293 self.assertEqual(f.read(), b"world\n")
294 self.assertEqual(f.read(), b"")
295
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 LARGE = 2**31
297
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 def large_file_ops(self, f):
299 assert f.readable()
300 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(self.LARGE), self.LARGE)
302 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000304 self.assertEqual(f.tell(), self.LARGE + 3)
305 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000307 self.assertEqual(f.tell(), self.LARGE + 2)
308 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000310 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
312 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000313 self.assertEqual(f.read(2), b"x")
314
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 def test_invalid_operations(self):
316 # Try writing on a file opened in read mode and vice-versa.
317 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000318 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000319 self.assertRaises(IOError, fp.read)
320 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000321 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 self.assertRaises(IOError, fp.write, b"blah")
323 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000324 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 self.assertRaises(IOError, fp.write, "blah")
326 self.assertRaises(IOError, fp.writelines, ["blah\n"])
327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb", buffering=0) as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb", buffering=0) as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339
Guido van Rossum87429772007-04-10 21:06:59 +0000340 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 self.assertEqual(f.readable(), False)
343 self.assertEqual(f.writable(), True)
344 self.assertEqual(f.seekable(), True)
345 self.write_ops(f)
346 with self.open(support.TESTFN, "rb") as f:
347 self.assertEqual(f.readable(), True)
348 self.assertEqual(f.writable(), False)
349 self.assertEqual(f.seekable(), True)
350 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000351
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000353 with self.open(support.TESTFN, "wb") as f:
354 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
355 with self.open(support.TESTFN, "rb") as f:
356 self.assertEqual(f.readline(), b"abc\n")
357 self.assertEqual(f.readline(10), b"def\n")
358 self.assertEqual(f.readline(2), b"xy")
359 self.assertEqual(f.readline(4), b"zzy\n")
360 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000361 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000362 self.assertRaises(TypeError, f.readline, 5.3)
363 with self.open(support.TESTFN, "r") as f:
364 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000367 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000368 self.write_ops(f)
369 data = f.getvalue()
370 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000372 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000373
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 # On Windows and Mac OSX this test comsumes large resources; It takes
376 # a long time to build the >2GB file and takes >2GB of disk space
377 # therefore the resource must be enabled to run this test.
378 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000379 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 print("\nTesting large file ops skipped on %s." % sys.platform,
381 file=sys.stderr)
382 print("It requires %d bytes and a long time." % self.LARGE,
383 file=sys.stderr)
384 print("Use 'regrtest.py -u largefile test_io' to run it.",
385 file=sys.stderr)
386 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "w+b", 0) as f:
388 self.large_file_ops(f)
389 with self.open(support.TESTFN, "w+b") as f:
390 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000391
392 def test_with_open(self):
393 for bufsize in (0, 1, 100):
394 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000396 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.closed, True)
398 f = None
399 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000401 1/0
402 except ZeroDivisionError:
403 self.assertEqual(f.closed, True)
404 else:
405 self.fail("1/0 didn't raise an exception")
406
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407 # issue 5008
408 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000410 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000412 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000414 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000416 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000417
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def test_destructor(self):
419 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def __del__(self):
422 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000429 def close(self):
430 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000432 def flush(self):
433 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000434 super().flush()
435 f = MyFileIO(support.TESTFN, "wb")
436 f.write(b"xxx")
437 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000438 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000441 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442
443 def _check_base_destructor(self, base):
444 record = []
445 class MyIO(base):
446 def __init__(self):
447 # This exercises the availability of attributes on object
448 # destruction.
449 # (in the C version, close() is called by the tp_dealloc
450 # function, not by __del__)
451 self.on_del = 1
452 self.on_close = 2
453 self.on_flush = 3
454 def __del__(self):
455 record.append(self.on_del)
456 try:
457 f = super().__del__
458 except AttributeError:
459 pass
460 else:
461 f()
462 def close(self):
463 record.append(self.on_close)
464 super().close()
465 def flush(self):
466 record.append(self.on_flush)
467 super().flush()
468 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000470 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 self.assertEqual(record, [1, 2, 3])
472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_IOBase_destructor(self):
474 self._check_base_destructor(self.IOBase)
475
476 def test_RawIOBase_destructor(self):
477 self._check_base_destructor(self.RawIOBase)
478
479 def test_BufferedIOBase_destructor(self):
480 self._check_base_destructor(self.BufferedIOBase)
481
482 def test_TextIOBase_destructor(self):
483 self._check_base_destructor(self.TextIOBase)
484
Guido van Rossum87429772007-04-10 21:06:59 +0000485 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000486 with self.open(support.TESTFN, "wb") as f:
487 f.write(b"xxx")
488 with self.open(support.TESTFN, "rb") as f:
489 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Guido van Rossumd4103952007-04-12 05:44:49 +0000491 def test_array_writes(self):
492 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000493 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000494 with self.open(support.TESTFN, "wb", 0) as f:
495 self.assertEqual(f.write(a), n)
496 with self.open(support.TESTFN, "wb") as f:
497 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000498
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000499 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000501 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_read_closed(self):
504 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "r") as f:
507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.read(), "egg\n")
509 file.seek(0)
510 file.close()
511 self.assertRaises(ValueError, file.read)
512
513 def test_no_closefd_with_filename(self):
514 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000516
517 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000519 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000521 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000523 self.assertEqual(file.buffer.raw.closefd, False)
524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 def test_garbage_collection(self):
526 # FileIO objects are collected, and collecting them flushes
527 # all data to disk.
528 f = self.FileIO(support.TESTFN, "wb")
529 f.write(b"abcxxx")
530 f.f = f
531 wr = weakref.ref(f)
532 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000533 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000534 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000537
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 def test_unbounded_file(self):
539 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
540 zero = "/dev/zero"
541 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000542 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000543 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000544 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000545 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000546 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000547 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000548 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000550 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000551 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000552 self.assertRaises(OverflowError, f.read)
553
Antoine Pitroufaf90072010-05-03 16:58:19 +0000554 def test_flush_error_on_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 def bad_flush():
557 raise IOError()
558 f.flush = bad_flush
559 self.assertRaises(IOError, f.close) # exception not swallowed
560
561 def test_multi_close(self):
562 f = self.open(support.TESTFN, "wb", buffering=0)
563 f.close()
564 f.close()
565 f.close()
566 self.assertRaises(ValueError, f.flush)
567
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000568 def test_RawIOBase_read(self):
569 # Exercise the default RawIOBase.read() implementation (which calls
570 # readinto() internally).
571 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
572 self.assertEqual(rawio.read(2), b"ab")
573 self.assertEqual(rawio.read(2), b"c")
574 self.assertEqual(rawio.read(2), b"d")
575 self.assertEqual(rawio.read(2), None)
576 self.assertEqual(rawio.read(2), b"ef")
577 self.assertEqual(rawio.read(2), b"g")
578 self.assertEqual(rawio.read(2), None)
579 self.assertEqual(rawio.read(2), b"")
580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581class CIOTest(IOTest):
582 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584class PyIOTest(IOTest):
585 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000586
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class CommonBufferedTests:
589 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
590
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000591 def test_detach(self):
592 raw = self.MockRawIO()
593 buf = self.tp(raw)
594 self.assertIs(buf.detach(), raw)
595 self.assertRaises(ValueError, buf.detach)
596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 def test_fileno(self):
598 rawio = self.MockRawIO()
599 bufio = self.tp(rawio)
600
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000601 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602
603 def test_no_fileno(self):
604 # XXX will we always have fileno() function? If so, kill
605 # this test. Else, write it.
606 pass
607
608 def test_invalid_args(self):
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 # Invalid whence
612 self.assertRaises(ValueError, bufio.seek, 0, -1)
613 self.assertRaises(ValueError, bufio.seek, 0, 3)
614
615 def test_override_destructor(self):
616 tp = self.tp
617 record = []
618 class MyBufferedIO(tp):
619 def __del__(self):
620 record.append(1)
621 try:
622 f = super().__del__
623 except AttributeError:
624 pass
625 else:
626 f()
627 def close(self):
628 record.append(2)
629 super().close()
630 def flush(self):
631 record.append(3)
632 super().flush()
633 rawio = self.MockRawIO()
634 bufio = MyBufferedIO(rawio)
635 writable = bufio.writable()
636 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000637 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 if writable:
639 self.assertEqual(record, [1, 2, 3])
640 else:
641 self.assertEqual(record, [1, 2])
642
643 def test_context_manager(self):
644 # Test usability as a context manager
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647 def _with():
648 with bufio:
649 pass
650 _with()
651 # bufio should now be closed, and using it a second time should raise
652 # a ValueError.
653 self.assertRaises(ValueError, _with)
654
655 def test_error_through_destructor(self):
656 # Test that the exception state is not modified by a destructor,
657 # even if close() fails.
658 rawio = self.CloseFailureIO()
659 def f():
660 self.tp(rawio).xyzzy
661 with support.captured_output("stderr") as s:
662 self.assertRaises(AttributeError, f)
663 s = s.getvalue().strip()
664 if s:
665 # The destructor *may* have printed an unraisable error, check it
666 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000667 self.assertTrue(s.startswith("Exception IOError: "), s)
668 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000669
Antoine Pitrou716c4442009-05-23 19:04:03 +0000670 def test_repr(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
674 self.assertEqual(repr(b), "<%s>" % clsname)
675 raw.name = "dummy"
676 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
677 raw.name = b"dummy"
678 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
679
Antoine Pitroufaf90072010-05-03 16:58:19 +0000680 def test_flush_error_on_close(self):
681 raw = self.MockRawIO()
682 def bad_flush():
683 raise IOError()
684 raw.flush = bad_flush
685 b = self.tp(raw)
686 self.assertRaises(IOError, b.close) # exception not swallowed
687
688 def test_multi_close(self):
689 raw = self.MockRawIO()
690 b = self.tp(raw)
691 b.close()
692 b.close()
693 b.close()
694 self.assertRaises(ValueError, b.flush)
695
Guido van Rossum78892e42007-04-06 17:31:18 +0000696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
698 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_constructor(self):
701 rawio = self.MockRawIO([b"abc"])
702 bufio = self.tp(rawio)
703 bufio.__init__(rawio)
704 bufio.__init__(rawio, buffer_size=1024)
705 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000706 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
708 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
709 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
710 rawio = self.MockRawIO([b"abc"])
711 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000712 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000715 for arg in (None, 7):
716 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
717 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000718 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 # Invalid args
720 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_read1(self):
723 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
724 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000725 self.assertEqual(b"a", bufio.read(1))
726 self.assertEqual(b"b", bufio.read1(1))
727 self.assertEqual(rawio._reads, 1)
728 self.assertEqual(b"c", bufio.read1(100))
729 self.assertEqual(rawio._reads, 1)
730 self.assertEqual(b"d", bufio.read1(100))
731 self.assertEqual(rawio._reads, 2)
732 self.assertEqual(b"efg", bufio.read1(100))
733 self.assertEqual(rawio._reads, 3)
734 self.assertEqual(b"", bufio.read1(100))
735 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 # Invalid args
737 self.assertRaises(ValueError, bufio.read1, -1)
738
739 def test_readinto(self):
740 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
741 bufio = self.tp(rawio)
742 b = bytearray(2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000743 self.assertEqual(bufio.readinto(b), 2)
744 self.assertEqual(b, b"ab")
745 self.assertEqual(bufio.readinto(b), 2)
746 self.assertEqual(b, b"cd")
747 self.assertEqual(bufio.readinto(b), 2)
748 self.assertEqual(b, b"ef")
749 self.assertEqual(bufio.readinto(b), 1)
750 self.assertEqual(b, b"gf")
751 self.assertEqual(bufio.readinto(b), 0)
752 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000754 def test_readlines(self):
755 def bufio():
756 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
757 return self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000758 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
759 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
760 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000763 data = b"abcdefghi"
764 dlen = len(data)
765
766 tests = [
767 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
768 [ 100, [ 3, 3, 3], [ dlen ] ],
769 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
770 ]
771
772 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 rawio = self.MockFileIO(data)
774 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000775 pos = 0
776 for nbytes in buf_read_sizes:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000777 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000778 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 # this is mildly implementation-dependent
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000780 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000783 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000784 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
785 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000786
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000787 self.assertEqual(b"abcd", bufio.read(6))
788 self.assertEqual(b"e", bufio.read(1))
789 self.assertEqual(b"fg", bufio.read())
790 self.assertEqual(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000791 self.assertTrue(None is bufio.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000792 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read_past_eof(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000797
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000798 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000800 def test_read_all(self):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000803
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000804 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000805
Antoine Pitrou27314942010-10-14 15:41:23 +0000806 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000808 try:
809 # Write out many bytes with exactly the same number of 0's,
810 # 1's... 255's. This will help us check that concurrent reading
811 # doesn't duplicate or forget contents.
812 N = 1000
813 l = list(range(256)) * N
814 random.shuffle(l)
815 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000816 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000817 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000818 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000820 errors = []
821 results = []
822 def f():
823 try:
824 # Intra-buffer read then buffer-flushing read
825 for n in cycle([1, 19]):
826 s = bufio.read(n)
827 if not s:
828 break
829 # list.append() is atomic
830 results.append(s)
831 except Exception as e:
832 errors.append(e)
833 raise
834 threads = [threading.Thread(target=f) for x in range(20)]
835 for t in threads:
836 t.start()
837 time.sleep(0.02) # yield
838 for t in threads:
839 t.join()
840 self.assertFalse(errors,
841 "the following exceptions were caught: %r" % errors)
842 s = b''.join(results)
843 for i in range(256):
844 c = bytes(bytearray([i]))
845 self.assertEqual(s.count(c), N)
846 finally:
847 support.unlink(support.TESTFN)
848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 def test_misbehaved_io(self):
850 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
851 bufio = self.tp(rawio)
852 self.assertRaises(IOError, bufio.seek, 0)
853 self.assertRaises(IOError, bufio.tell)
854
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000855 def test_no_extraneous_read(self):
856 # Issue #9550; when the raw IO object has satisfied the read request,
857 # we should not issue any additional reads, otherwise it may block
858 # (e.g. socket).
859 bufsize = 16
860 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
861 rawio = self.MockRawIO([b"x" * n])
862 bufio = self.tp(rawio, bufsize)
863 self.assertEqual(bufio.read(n), b"x" * n)
864 # Simple case: one raw read is enough to satisfy the request.
865 self.assertEqual(rawio._extraneous_reads, 0,
866 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
867 # A more complex case where two raw reads are needed to satisfy
868 # the request.
869 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
870 bufio = self.tp(rawio, bufsize)
871 self.assertEqual(bufio.read(n), b"x" * n)
872 self.assertEqual(rawio._extraneous_reads, 0,
873 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
874
875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876class CBufferedReaderTest(BufferedReaderTest):
877 tp = io.BufferedReader
878
879 def test_constructor(self):
880 BufferedReaderTest.test_constructor(self)
881 # The allocation can succeed on 32-bit builds, e.g. with more
882 # than 2GB RAM and a 64-bit kernel.
883 if sys.maxsize > 0x7FFFFFFF:
884 rawio = self.MockRawIO()
885 bufio = self.tp(rawio)
886 self.assertRaises((OverflowError, MemoryError, ValueError),
887 bufio.__init__, rawio, sys.maxsize)
888
889 def test_initialization(self):
890 rawio = self.MockRawIO([b"abc"])
891 bufio = self.tp(rawio)
892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
893 self.assertRaises(ValueError, bufio.read)
894 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
895 self.assertRaises(ValueError, bufio.read)
896 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
897 self.assertRaises(ValueError, bufio.read)
898
899 def test_misbehaved_io_read(self):
900 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
901 bufio = self.tp(rawio)
902 # _pyio.BufferedReader seems to implement reading different, so that
903 # checking this is not so easy.
904 self.assertRaises(IOError, bufio.read, 10)
905
906 def test_garbage_collection(self):
907 # C BufferedReader objects are collected.
908 # The Python version has __del__, so it ends into gc.garbage instead
909 rawio = self.FileIO(support.TESTFN, "w+b")
910 f = self.tp(rawio)
911 f.f = f
912 wr = weakref.ref(f)
913 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000914 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000915 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916
917class PyBufferedReaderTest(BufferedReaderTest):
918 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000919
Guido van Rossuma9e20242007-03-08 00:43:48 +0000920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
922 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_constructor(self):
925 rawio = self.MockRawIO()
926 bufio = self.tp(rawio)
927 bufio.__init__(rawio)
928 bufio.__init__(rawio, buffer_size=1024)
929 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000930 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 bufio.flush()
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
934 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
935 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000936 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000938 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000940 def test_detach_flush(self):
941 raw = self.MockRawIO()
942 buf = self.tp(raw)
943 buf.write(b"howdy!")
944 self.assertFalse(raw._write_stack)
945 buf.detach()
946 self.assertEqual(raw._write_stack, [b"howdy!"])
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000949 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 writer = self.MockRawIO()
951 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000952 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000953 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 def test_write_overflow(self):
956 writer = self.MockRawIO()
957 bufio = self.tp(writer, 8)
958 contents = b"abcdefghijklmnop"
959 for n in range(0, len(contents), 3):
960 bufio.write(contents[n:n+3])
961 flushed = b"".join(writer._write_stack)
962 # At least (total - 8) bytes were implicitly flushed, perhaps more
963 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000964 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966 def check_writes(self, intermediate_func):
967 # Lots of writes, test the flushed output is as expected.
968 contents = bytes(range(256)) * 1000
969 n = 0
970 writer = self.MockRawIO()
971 bufio = self.tp(writer, 13)
972 # Generator of write sizes: repeat each N 15 times then proceed to N+1
973 def gen_sizes():
974 for size in count(1):
975 for i in range(15):
976 yield size
977 sizes = gen_sizes()
978 while n < len(contents):
979 size = min(next(sizes), len(contents) - n)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000980 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 intermediate_func(bufio)
982 n += size
983 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000984 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 def test_writes(self):
987 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_writes_and_flushes(self):
990 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 def test_writes_and_seeks(self):
993 def _seekabs(bufio):
994 pos = bufio.tell()
995 bufio.seek(pos + 1, 0)
996 bufio.seek(pos - 1, 0)
997 bufio.seek(pos, 0)
998 self.check_writes(_seekabs)
999 def _seekrel(bufio):
1000 pos = bufio.seek(0, 1)
1001 bufio.seek(+1, 1)
1002 bufio.seek(-1, 1)
1003 bufio.seek(pos, 0)
1004 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 def test_writes_and_truncates(self):
1007 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001009 def test_write_non_blocking(self):
1010 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001011 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001012
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001013 self.assertEqual(bufio.write(b"abcd"), 4)
1014 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 # 1 byte will be written, the rest will be buffered
1016 raw.block_on(b"k")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001017 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1020 raw.block_on(b"0")
1021 try:
1022 bufio.write(b"opqrwxyz0123456789")
1023 except self.BlockingIOError as e:
1024 written = e.characters_written
1025 else:
1026 self.fail("BlockingIOError should have been raised")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001027 self.assertEqual(written, 16)
1028 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001030
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001031 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 s = raw.pop_written()
1033 # Previously buffered bytes were flushed
1034 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 def test_write_and_rewind(self):
1037 raw = io.BytesIO()
1038 bufio = self.tp(raw, 4)
1039 self.assertEqual(bufio.write(b"abcdef"), 6)
1040 self.assertEqual(bufio.tell(), 6)
1041 bufio.seek(0, 0)
1042 self.assertEqual(bufio.write(b"XY"), 2)
1043 bufio.seek(6, 0)
1044 self.assertEqual(raw.getvalue(), b"XYcdef")
1045 self.assertEqual(bufio.write(b"123456"), 6)
1046 bufio.flush()
1047 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 def test_flush(self):
1050 writer = self.MockRawIO()
1051 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001052 bufio.write(b"abc")
1053 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001054 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_destructor(self):
1057 writer = self.MockRawIO()
1058 bufio = self.tp(writer, 8)
1059 bufio.write(b"abc")
1060 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001061 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001062 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063
1064 def test_truncate(self):
1065 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001066 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 bufio = self.tp(raw, 8)
1068 bufio.write(b"abcdef")
1069 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001070 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001071 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 self.assertEqual(f.read(), b"abc")
1073
Antoine Pitrou27314942010-10-14 15:41:23 +00001074 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001076 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 # Write out many bytes from many threads and test they were
1078 # all flushed.
1079 N = 1000
1080 contents = bytes(range(256)) * N
1081 sizes = cycle([1, 19])
1082 n = 0
1083 queue = deque()
1084 while n < len(contents):
1085 size = next(sizes)
1086 queue.append(contents[n:n+size])
1087 n += size
1088 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001089 # We use a real file object because it allows us to
1090 # exercise situations where the GIL is released before
1091 # writing the buffer to the raw streams. This is in addition
1092 # to concurrency issues due to switching threads in the middle
1093 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001094 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001096 errors = []
1097 def f():
1098 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 while True:
1100 try:
1101 s = queue.popleft()
1102 except IndexError:
1103 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001104 bufio.write(s)
1105 except Exception as e:
1106 errors.append(e)
1107 raise
1108 threads = [threading.Thread(target=f) for x in range(20)]
1109 for t in threads:
1110 t.start()
1111 time.sleep(0.02) # yield
1112 for t in threads:
1113 t.join()
1114 self.assertFalse(errors,
1115 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001117 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 s = f.read()
1119 for i in range(256):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001120 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001121 finally:
1122 support.unlink(support.TESTFN)
1123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def test_misbehaved_io(self):
1125 rawio = self.MisbehavedRawIO()
1126 bufio = self.tp(rawio, 5)
1127 self.assertRaises(IOError, bufio.seek, 0)
1128 self.assertRaises(IOError, bufio.tell)
1129 self.assertRaises(IOError, bufio.write, b"abcdef")
1130
Benjamin Peterson59406a92009-03-26 17:10:29 +00001131 def test_max_buffer_size_deprecation(self):
1132 with support.check_warnings() as w:
1133 warnings.simplefilter("always", DeprecationWarning)
1134 self.tp(self.MockRawIO(), 8, 12)
1135 self.assertEqual(len(w.warnings), 1)
1136 warning = w.warnings[0]
1137 self.assertTrue(warning.category is DeprecationWarning)
1138 self.assertEqual(str(warning.message),
1139 "max_buffer_size is deprecated")
1140
1141
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142class CBufferedWriterTest(BufferedWriterTest):
1143 tp = io.BufferedWriter
1144
1145 def test_constructor(self):
1146 BufferedWriterTest.test_constructor(self)
1147 # The allocation can succeed on 32-bit builds, e.g. with more
1148 # than 2GB RAM and a 64-bit kernel.
1149 if sys.maxsize > 0x7FFFFFFF:
1150 rawio = self.MockRawIO()
1151 bufio = self.tp(rawio)
1152 self.assertRaises((OverflowError, MemoryError, ValueError),
1153 bufio.__init__, rawio, sys.maxsize)
1154
1155 def test_initialization(self):
1156 rawio = self.MockRawIO()
1157 bufio = self.tp(rawio)
1158 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1159 self.assertRaises(ValueError, bufio.write, b"def")
1160 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1161 self.assertRaises(ValueError, bufio.write, b"def")
1162 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1163 self.assertRaises(ValueError, bufio.write, b"def")
1164
1165 def test_garbage_collection(self):
1166 # C BufferedWriter objects are collected, and collecting them flushes
1167 # all data to disk.
1168 # The Python version has __del__, so it ends into gc.garbage instead
1169 rawio = self.FileIO(support.TESTFN, "w+b")
1170 f = self.tp(rawio)
1171 f.write(b"123xxx")
1172 f.x = f
1173 wr = weakref.ref(f)
1174 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001175 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001176 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001177 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 self.assertEqual(f.read(), b"123xxx")
1179
1180
1181class PyBufferedWriterTest(BufferedWriterTest):
1182 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001183
Guido van Rossum01a27522007-03-07 01:00:12 +00001184class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001185
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001186 def test_constructor(self):
1187 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001188 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001189
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001190 def test_detach(self):
1191 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1192 self.assertRaises(self.UnsupportedOperation, pair.detach)
1193
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001194 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001195 with support.check_warnings() as w:
1196 warnings.simplefilter("always", DeprecationWarning)
1197 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1198 self.assertEqual(len(w.warnings), 1)
1199 warning = w.warnings[0]
1200 self.assertTrue(warning.category is DeprecationWarning)
1201 self.assertEqual(str(warning.message),
1202 "max_buffer_size is deprecated")
1203
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001204 def test_constructor_with_not_readable(self):
1205 class NotReadable(MockRawIO):
1206 def readable(self):
1207 return False
1208
1209 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1210
1211 def test_constructor_with_not_writeable(self):
1212 class NotWriteable(MockRawIO):
1213 def writable(self):
1214 return False
1215
1216 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1217
1218 def test_read(self):
1219 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1220
1221 self.assertEqual(pair.read(3), b"abc")
1222 self.assertEqual(pair.read(1), b"d")
1223 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001224 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1225 self.assertEqual(pair.read(None), b"abc")
1226
1227 def test_readlines(self):
1228 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1229 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1230 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1231 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001232
1233 def test_read1(self):
1234 # .read1() is delegated to the underlying reader object, so this test
1235 # can be shallow.
1236 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1237
1238 self.assertEqual(pair.read1(3), b"abc")
1239
1240 def test_readinto(self):
1241 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1242
1243 data = bytearray(5)
1244 self.assertEqual(pair.readinto(data), 5)
1245 self.assertEqual(data, b"abcde")
1246
1247 def test_write(self):
1248 w = self.MockRawIO()
1249 pair = self.tp(self.MockRawIO(), w)
1250
1251 pair.write(b"abc")
1252 pair.flush()
1253 pair.write(b"def")
1254 pair.flush()
1255 self.assertEqual(w._write_stack, [b"abc", b"def"])
1256
1257 def test_peek(self):
1258 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1259
1260 self.assertTrue(pair.peek(3).startswith(b"abc"))
1261 self.assertEqual(pair.read(3), b"abc")
1262
1263 def test_readable(self):
1264 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1265 self.assertTrue(pair.readable())
1266
1267 def test_writeable(self):
1268 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1269 self.assertTrue(pair.writable())
1270
1271 def test_seekable(self):
1272 # BufferedRWPairs are never seekable, even if their readers and writers
1273 # are.
1274 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1275 self.assertFalse(pair.seekable())
1276
1277 # .flush() is delegated to the underlying writer object and has been
1278 # tested in the test_write method.
1279
1280 def test_close_and_closed(self):
1281 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1282 self.assertFalse(pair.closed)
1283 pair.close()
1284 self.assertTrue(pair.closed)
1285
1286 def test_isatty(self):
1287 class SelectableIsAtty(MockRawIO):
1288 def __init__(self, isatty):
1289 MockRawIO.__init__(self)
1290 self._isatty = isatty
1291
1292 def isatty(self):
1293 return self._isatty
1294
1295 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1296 self.assertFalse(pair.isatty())
1297
1298 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1299 self.assertTrue(pair.isatty())
1300
1301 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1302 self.assertTrue(pair.isatty())
1303
1304 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1305 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001306
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307class CBufferedRWPairTest(BufferedRWPairTest):
1308 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001309
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310class PyBufferedRWPairTest(BufferedRWPairTest):
1311 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313
1314class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1315 read_mode = "rb+"
1316 write_mode = "wb+"
1317
1318 def test_constructor(self):
1319 BufferedReaderTest.test_constructor(self)
1320 BufferedWriterTest.test_constructor(self)
1321
1322 def test_read_and_write(self):
1323 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001324 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001325
1326 self.assertEqual(b"as", rw.read(2))
1327 rw.write(b"ddd")
1328 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001329 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001330 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001331 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333 def test_seek_and_tell(self):
1334 raw = self.BytesIO(b"asdfghjkl")
1335 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001336
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001337 self.assertEqual(b"as", rw.read(2))
1338 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001339 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001340 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001341
1342 rw.write(b"asdf")
1343 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001344 self.assertEqual(b"asdfasdfl", rw.read())
1345 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001346 rw.seek(-4, 2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001347 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001348 rw.seek(2, 1)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001349 self.assertEqual(7, rw.tell())
1350 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001351 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def check_flush_and_read(self, read_func):
1354 raw = self.BytesIO(b"abcdefghi")
1355 bufio = self.tp(raw)
1356
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001357 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001358 bufio.write(b"12")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001359 self.assertEqual(b"ef", read_func(bufio, 2))
1360 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001362 self.assertEqual(6, bufio.tell())
1363 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 raw.seek(0, 0)
1365 raw.write(b"XYZ")
1366 # flush() resets the read buffer
1367 bufio.flush()
1368 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001369 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370
1371 def test_flush_and_read(self):
1372 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1373
1374 def test_flush_and_readinto(self):
1375 def _readinto(bufio, n=-1):
1376 b = bytearray(n if n >= 0 else 9999)
1377 n = bufio.readinto(b)
1378 return bytes(b[:n])
1379 self.check_flush_and_read(_readinto)
1380
1381 def test_flush_and_peek(self):
1382 def _peek(bufio, n=-1):
1383 # This relies on the fact that the buffer can contain the whole
1384 # raw stream, otherwise peek() can return less.
1385 b = bufio.peek(n)
1386 if n != -1:
1387 b = b[:n]
1388 bufio.seek(len(b), 1)
1389 return b
1390 self.check_flush_and_read(_peek)
1391
1392 def test_flush_and_write(self):
1393 raw = self.BytesIO(b"abcdefghi")
1394 bufio = self.tp(raw)
1395
1396 bufio.write(b"123")
1397 bufio.flush()
1398 bufio.write(b"45")
1399 bufio.flush()
1400 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001401 self.assertEqual(b"12345fghi", raw.getvalue())
1402 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403
1404 def test_threads(self):
1405 BufferedReaderTest.test_threads(self)
1406 BufferedWriterTest.test_threads(self)
1407
1408 def test_writes_and_peek(self):
1409 def _peek(bufio):
1410 bufio.peek(1)
1411 self.check_writes(_peek)
1412 def _peek(bufio):
1413 pos = bufio.tell()
1414 bufio.seek(-1, 1)
1415 bufio.peek(1)
1416 bufio.seek(pos, 0)
1417 self.check_writes(_peek)
1418
1419 def test_writes_and_reads(self):
1420 def _read(bufio):
1421 bufio.seek(-1, 1)
1422 bufio.read(1)
1423 self.check_writes(_read)
1424
1425 def test_writes_and_read1s(self):
1426 def _read1(bufio):
1427 bufio.seek(-1, 1)
1428 bufio.read1(1)
1429 self.check_writes(_read1)
1430
1431 def test_writes_and_readintos(self):
1432 def _read(bufio):
1433 bufio.seek(-1, 1)
1434 bufio.readinto(bytearray(1))
1435 self.check_writes(_read)
1436
Antoine Pitrou0473e562009-08-06 20:52:43 +00001437 def test_write_after_readahead(self):
1438 # Issue #6629: writing after the buffer was filled by readahead should
1439 # first rewind the raw stream.
1440 for overwrite_size in [1, 5]:
1441 raw = self.BytesIO(b"A" * 10)
1442 bufio = self.tp(raw, 4)
1443 # Trigger readahead
1444 self.assertEqual(bufio.read(1), b"A")
1445 self.assertEqual(bufio.tell(), 1)
1446 # Overwriting should rewind the raw stream if it needs so
1447 bufio.write(b"B" * overwrite_size)
1448 self.assertEqual(bufio.tell(), overwrite_size + 1)
1449 # If the write size was smaller than the buffer size, flush() and
1450 # check that rewind happens.
1451 bufio.flush()
1452 self.assertEqual(bufio.tell(), overwrite_size + 1)
1453 s = raw.getvalue()
1454 self.assertEqual(s,
1455 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1456
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001457 def test_truncate_after_read_or_write(self):
1458 raw = self.BytesIO(b"A" * 10)
1459 bufio = self.tp(raw, 100)
1460 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1461 self.assertEqual(bufio.truncate(), 2)
1462 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1463 self.assertEqual(bufio.truncate(), 4)
1464
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465 def test_misbehaved_io(self):
1466 BufferedReaderTest.test_misbehaved_io(self)
1467 BufferedWriterTest.test_misbehaved_io(self)
1468
1469class CBufferedRandomTest(BufferedRandomTest):
1470 tp = io.BufferedRandom
1471
1472 def test_constructor(self):
1473 BufferedRandomTest.test_constructor(self)
1474 # The allocation can succeed on 32-bit builds, e.g. with more
1475 # than 2GB RAM and a 64-bit kernel.
1476 if sys.maxsize > 0x7FFFFFFF:
1477 rawio = self.MockRawIO()
1478 bufio = self.tp(rawio)
1479 self.assertRaises((OverflowError, MemoryError, ValueError),
1480 bufio.__init__, rawio, sys.maxsize)
1481
1482 def test_garbage_collection(self):
1483 CBufferedReaderTest.test_garbage_collection(self)
1484 CBufferedWriterTest.test_garbage_collection(self)
1485
1486class PyBufferedRandomTest(BufferedRandomTest):
1487 tp = pyio.BufferedRandom
1488
1489
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001490# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1491# properties:
1492# - A single output character can correspond to many bytes of input.
1493# - The number of input bytes to complete the character can be
1494# undetermined until the last input byte is received.
1495# - The number of input bytes can vary depending on previous input.
1496# - A single input byte can correspond to many characters of output.
1497# - The number of output characters can be undetermined until the
1498# last input byte is received.
1499# - The number of output characters can vary depending on previous input.
1500
1501class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1502 """
1503 For testing seek/tell behavior with a stateful, buffering decoder.
1504
1505 Input is a sequence of words. Words may be fixed-length (length set
1506 by input) or variable-length (period-terminated). In variable-length
1507 mode, extra periods are ignored. Possible words are:
1508 - 'i' followed by a number sets the input length, I (maximum 99).
1509 When I is set to 0, words are space-terminated.
1510 - 'o' followed by a number sets the output length, O (maximum 99).
1511 - Any other word is converted into a word followed by a period on
1512 the output. The output word consists of the input word truncated
1513 or padded out with hyphens to make its length equal to O. If O
1514 is 0, the word is output verbatim without truncating or padding.
1515 I and O are initially set to 1. When I changes, any buffered input is
1516 re-scanned according to the new I. EOF also terminates the last word.
1517 """
1518
1519 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001520 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001521 self.reset()
1522
1523 def __repr__(self):
1524 return '<SID %x>' % id(self)
1525
1526 def reset(self):
1527 self.i = 1
1528 self.o = 1
1529 self.buffer = bytearray()
1530
1531 def getstate(self):
1532 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1533 return bytes(self.buffer), i*100 + o
1534
1535 def setstate(self, state):
1536 buffer, io = state
1537 self.buffer = bytearray(buffer)
1538 i, o = divmod(io, 100)
1539 self.i, self.o = i ^ 1, o ^ 1
1540
1541 def decode(self, input, final=False):
1542 output = ''
1543 for b in input:
1544 if self.i == 0: # variable-length, terminated with period
1545 if b == ord('.'):
1546 if self.buffer:
1547 output += self.process_word()
1548 else:
1549 self.buffer.append(b)
1550 else: # fixed-length, terminate after self.i bytes
1551 self.buffer.append(b)
1552 if len(self.buffer) == self.i:
1553 output += self.process_word()
1554 if final and self.buffer: # EOF terminates the last word
1555 output += self.process_word()
1556 return output
1557
1558 def process_word(self):
1559 output = ''
1560 if self.buffer[0] == ord('i'):
1561 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1562 elif self.buffer[0] == ord('o'):
1563 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1564 else:
1565 output = self.buffer.decode('ascii')
1566 if len(output) < self.o:
1567 output += '-'*self.o # pad out with hyphens
1568 if self.o:
1569 output = output[:self.o] # truncate to output length
1570 output += '.'
1571 self.buffer = bytearray()
1572 return output
1573
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001574 codecEnabled = False
1575
1576 @classmethod
1577 def lookupTestDecoder(cls, name):
1578 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001579 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001580 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001581 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001582 incrementalencoder=None,
1583 streamreader=None, streamwriter=None,
1584 incrementaldecoder=cls)
1585
1586# Register the previous decoder for testing.
1587# Disabled by default, tests will enable it.
1588codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1589
1590
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001591class StatefulIncrementalDecoderTest(unittest.TestCase):
1592 """
1593 Make sure the StatefulIncrementalDecoder actually works.
1594 """
1595
1596 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001597 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001598 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001599 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001600 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001601 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001602 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001603 # I=0, O=6 (variable-length input, fixed-length output)
1604 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1605 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001606 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001607 # I=6, O=3 (fixed-length input > fixed-length output)
1608 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1609 # I=0, then 3; O=29, then 15 (with longer output)
1610 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1611 'a----------------------------.' +
1612 'b----------------------------.' +
1613 'cde--------------------------.' +
1614 'abcdefghijabcde.' +
1615 'a.b------------.' +
1616 '.c.------------.' +
1617 'd.e------------.' +
1618 'k--------------.' +
1619 'l--------------.' +
1620 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001621 ]
1622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001623 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001624 # Try a few one-shot test cases.
1625 for input, eof, output in self.test_cases:
1626 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001627 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001628
1629 # Also test an unfinished decode, followed by forcing EOF.
1630 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001631 self.assertEqual(d.decode(b'oiabcd'), '')
1632 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001633
1634class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001635
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001636 def setUp(self):
1637 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1638 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001639 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001640
Guido van Rossumd0712812007-04-11 16:32:43 +00001641 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001642 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 def test_constructor(self):
1645 r = self.BytesIO(b"\xc3\xa9\n\n")
1646 b = self.BufferedReader(r, 1000)
1647 t = self.TextIOWrapper(b)
1648 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001649 self.assertEqual(t.encoding, "latin1")
1650 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001652 self.assertEqual(t.encoding, "utf8")
1653 self.assertEqual(t.line_buffering, True)
1654 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 self.assertRaises(TypeError, t.__init__, b, newline=42)
1656 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1657
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001658 def test_detach(self):
1659 r = self.BytesIO()
1660 b = self.BufferedWriter(r)
1661 t = self.TextIOWrapper(b)
1662 self.assertIs(t.detach(), b)
1663
1664 t = self.TextIOWrapper(b, encoding="ascii")
1665 t.write("howdy")
1666 self.assertFalse(r.getvalue())
1667 t.detach()
1668 self.assertEqual(r.getvalue(), b"howdy")
1669 self.assertRaises(ValueError, t.detach)
1670
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001671 def test_repr(self):
1672 raw = self.BytesIO("hello".encode("utf-8"))
1673 b = self.BufferedReader(raw)
1674 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001675 modname = self.TextIOWrapper.__module__
1676 self.assertEqual(repr(t),
1677 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1678 raw.name = "dummy"
1679 self.assertEqual(repr(t),
1680 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1681 raw.name = b"dummy"
1682 self.assertEqual(repr(t),
1683 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_line_buffering(self):
1686 r = self.BytesIO()
1687 b = self.BufferedWriter(r, 1000)
1688 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001689 t.write("X")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001690 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001691 t.write("Y\nZ")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001692 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001693 t.write("A\rB")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001694 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 def test_encoding(self):
1697 # Check the encoding attribute is always set, and valid
1698 b = self.BytesIO()
1699 t = self.TextIOWrapper(b, encoding="utf8")
1700 self.assertEqual(t.encoding, "utf8")
1701 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001702 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 codecs.lookup(t.encoding)
1704
1705 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001706 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 b = self.BytesIO(b"abc\n\xff\n")
1708 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001709 self.assertRaises(UnicodeError, t.read)
1710 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 b = self.BytesIO(b"abc\n\xff\n")
1712 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001713 self.assertRaises(UnicodeError, t.read)
1714 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 b = self.BytesIO(b"abc\n\xff\n")
1716 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001717 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001718 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 b = self.BytesIO(b"abc\n\xff\n")
1720 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001721 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001724 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 b = self.BytesIO()
1726 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001727 self.assertRaises(UnicodeError, t.write, "\xff")
1728 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 b = self.BytesIO()
1730 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001731 self.assertRaises(UnicodeError, t.write, "\xff")
1732 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001733 b = self.BytesIO()
1734 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001735 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001736 t.write("abc\xffdef\n")
1737 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001738 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001739 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 b = self.BytesIO()
1741 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001742 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001743 t.write("abc\xffdef\n")
1744 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001745 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001748 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1749
1750 tests = [
1751 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001752 [ '', input_lines ],
1753 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1754 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1755 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001756 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001757 encodings = (
1758 'utf-8', 'latin-1',
1759 'utf-16', 'utf-16-le', 'utf-16-be',
1760 'utf-32', 'utf-32-le', 'utf-32-be',
1761 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001762
Guido van Rossum8358db22007-08-18 21:39:55 +00001763 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001764 # character in TextIOWrapper._pending_line.
1765 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001766 # XXX: str.encode() should return bytes
1767 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001768 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001769 for bufsize in range(1, 10):
1770 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1772 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001773 encoding=encoding)
1774 if do_reads:
1775 got_lines = []
1776 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001777 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001778 if c2 == '':
1779 break
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001780 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001781 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001782 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001783 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001784
1785 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001786 self.assertEqual(got_line, exp_line)
1787 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001789 def test_newlines_input(self):
1790 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001791 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1792 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001793 (None, normalized.decode("ascii").splitlines(True)),
1794 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1796 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1797 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001798 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 buf = self.BytesIO(testdata)
1800 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001801 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001802 txt.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001803 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001804
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805 def test_newlines_output(self):
1806 testdict = {
1807 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1808 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1809 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1810 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1811 }
1812 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1813 for newline, expected in tests:
1814 buf = self.BytesIO()
1815 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1816 txt.write("AAA\nB")
1817 txt.write("BB\nCCC\n")
1818 txt.write("X\rY\r\nZ")
1819 txt.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001820 self.assertEqual(buf.closed, False)
1821 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822
1823 def test_destructor(self):
1824 l = []
1825 base = self.BytesIO
1826 class MyBytesIO(base):
1827 def close(self):
1828 l.append(self.getvalue())
1829 base.close(self)
1830 b = MyBytesIO()
1831 t = self.TextIOWrapper(b, encoding="ascii")
1832 t.write("abc")
1833 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001834 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001835 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836
1837 def test_override_destructor(self):
1838 record = []
1839 class MyTextIO(self.TextIOWrapper):
1840 def __del__(self):
1841 record.append(1)
1842 try:
1843 f = super().__del__
1844 except AttributeError:
1845 pass
1846 else:
1847 f()
1848 def close(self):
1849 record.append(2)
1850 super().close()
1851 def flush(self):
1852 record.append(3)
1853 super().flush()
1854 b = self.BytesIO()
1855 t = MyTextIO(b, encoding="ascii")
1856 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001857 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 self.assertEqual(record, [1, 2, 3])
1859
1860 def test_error_through_destructor(self):
1861 # Test that the exception state is not modified by a destructor,
1862 # even if close() fails.
1863 rawio = self.CloseFailureIO()
1864 def f():
1865 self.TextIOWrapper(rawio).xyzzy
1866 with support.captured_output("stderr") as s:
1867 self.assertRaises(AttributeError, f)
1868 s = s.getvalue().strip()
1869 if s:
1870 # The destructor *may* have printed an unraisable error, check it
1871 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001872 self.assertTrue(s.startswith("Exception IOError: "), s)
1873 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001874
Guido van Rossum9b76da62007-04-11 01:09:03 +00001875 # Systematic tests of the text I/O API
1876
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001878 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1879 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001881 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001882 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001885 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001886 self.assertEqual(f.tell(), 0)
1887 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001888 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001889 self.assertEqual(f.seek(0), 0)
1890 self.assertEqual(f.read(None), "abc")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001891 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001892 self.assertEqual(f.read(2), "ab")
1893 self.assertEqual(f.read(1), "c")
1894 self.assertEqual(f.read(1), "")
1895 self.assertEqual(f.read(), "")
1896 self.assertEqual(f.tell(), cookie)
1897 self.assertEqual(f.seek(0), 0)
1898 self.assertEqual(f.seek(0, 2), cookie)
1899 self.assertEqual(f.write("def"), 3)
1900 self.assertEqual(f.seek(cookie), cookie)
1901 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001902 if enc.startswith("utf"):
1903 self.multi_line_test(f, enc)
1904 f.close()
1905
1906 def multi_line_test(self, f, enc):
1907 f.seek(0)
1908 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001909 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001911 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001912 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001913 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001915 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001916 wlines.append((f.tell(), line))
1917 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001918 f.seek(0)
1919 rlines = []
1920 while True:
1921 pos = f.tell()
1922 line = f.readline()
1923 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001924 break
1925 rlines.append((pos, line))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001926 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 def test_telling(self):
1929 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001930 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001931 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001932 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001933 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001934 p2 = f.tell()
1935 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001936 self.assertEqual(f.tell(), p0)
1937 self.assertEqual(f.readline(), "\xff\n")
1938 self.assertEqual(f.tell(), p1)
1939 self.assertEqual(f.readline(), "\xff\n")
1940 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001941 f.seek(0)
1942 for line in f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001943 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001944 self.assertRaises(IOError, f.tell)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001945 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001946 f.close()
1947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 def test_seeking(self):
1949 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001950 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001951 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001952 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001953 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001954 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001955 suffix = bytes(u_suffix.encode("utf-8"))
1956 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001958 f.write(line*2)
1959 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001961 s = f.read(prefix_size)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001962 self.assertEqual(s, str(prefix, "ascii"))
1963 self.assertEqual(f.tell(), prefix_size)
1964 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00001965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001967 # Regression test for a specific bug
1968 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001970 f.write(data)
1971 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001972 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001973 f._CHUNK_SIZE # Just test that it exists
1974 f._CHUNK_SIZE = 2
1975 f.readline()
1976 f.tell()
1977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 def test_seek_and_tell(self):
1979 #Test seek/tell using the StatefulIncrementalDecoder.
1980 # Make test faster by doing smaller seeks
1981 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001982
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001983 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001984 """Tell/seek to various points within a data stream and ensure
1985 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001987 f.write(data)
1988 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 f = self.open(support.TESTFN, encoding='test_decoder')
1990 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001991 decoded = f.read()
1992 f.close()
1993
Neal Norwitze2b07052008-03-18 19:52:05 +00001994 for i in range(min_pos, len(decoded) + 1): # seek positions
1995 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001997 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001998 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001999 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002000 f.seek(cookie)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002001 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002002 f.close()
2003
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002004 # Enable the test decoder.
2005 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002006
2007 # Run the tests.
2008 try:
2009 # Try each test case.
2010 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002011 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012
2013 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2015 offset = CHUNK_SIZE - len(input)//2
2016 prefix = b'.'*offset
2017 # Don't bother seeking into the prefix (takes too long).
2018 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002019 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002020
2021 # Ensure our test decoder won't interfere with subsequent tests.
2022 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002023 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002026 data = "1234567890"
2027 tests = ("utf-16",
2028 "utf-16-le",
2029 "utf-16-be",
2030 "utf-32",
2031 "utf-32-le",
2032 "utf-32-be")
2033 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002034 buf = self.BytesIO()
2035 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002036 # Check if the BOM is written only once (see issue1753).
2037 f.write(data)
2038 f.write(data)
2039 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002040 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002041 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002042 self.assertEqual(f.read(), data * 2)
2043 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002044
Benjamin Petersona1b49012009-03-31 23:11:32 +00002045 def test_unreadable(self):
2046 class UnReadable(self.BytesIO):
2047 def readable(self):
2048 return False
2049 txt = self.TextIOWrapper(UnReadable())
2050 self.assertRaises(IOError, txt.read)
2051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 def test_read_one_by_one(self):
2053 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002054 reads = ""
2055 while True:
2056 c = txt.read(1)
2057 if not c:
2058 break
2059 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002060 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002061
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002062 def test_readlines(self):
2063 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2064 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2065 txt.seek(0)
2066 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2067 txt.seek(0)
2068 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2069
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002070 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002072 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002074 reads = ""
2075 while True:
2076 c = txt.read(128)
2077 if not c:
2078 break
2079 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002080 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002081
2082 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002084
2085 # read one char at a time
2086 reads = ""
2087 while True:
2088 c = txt.read(1)
2089 if not c:
2090 break
2091 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002092 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002093
2094 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002096 txt._CHUNK_SIZE = 4
2097
2098 reads = ""
2099 while True:
2100 c = txt.read(4)
2101 if not c:
2102 break
2103 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002104 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002105
2106 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002107 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002108 txt._CHUNK_SIZE = 4
2109
2110 reads = txt.read(4)
2111 reads += txt.read(4)
2112 reads += txt.readline()
2113 reads += txt.readline()
2114 reads += txt.readline()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002115 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002116
2117 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002119 txt._CHUNK_SIZE = 4
2120
2121 reads = txt.read(4)
2122 reads += txt.read()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002123 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002124
2125 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002127 txt._CHUNK_SIZE = 4
2128
2129 reads = txt.read(4)
2130 pos = txt.tell()
2131 txt.seek(0)
2132 txt.seek(pos)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002133 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002134
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002135 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 buffer = self.BytesIO(self.testdata)
2137 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002138
2139 self.assertEqual(buffer.seekable(), txt.seekable())
2140
Antoine Pitroue4501852009-05-14 18:55:55 +00002141 def test_append_bom(self):
2142 # The BOM is not written again when appending to a non-empty file
2143 filename = support.TESTFN
2144 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2145 with self.open(filename, 'w', encoding=charset) as f:
2146 f.write('aaa')
2147 pos = f.tell()
2148 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002149 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002150
2151 with self.open(filename, 'a', encoding=charset) as f:
2152 f.write('xxx')
2153 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002154 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002155
2156 def test_seek_bom(self):
2157 # Same test, but when seeking manually
2158 filename = support.TESTFN
2159 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2160 with self.open(filename, 'w', encoding=charset) as f:
2161 f.write('aaa')
2162 pos = f.tell()
2163 with self.open(filename, 'r+', encoding=charset) as f:
2164 f.seek(pos)
2165 f.write('zzz')
2166 f.seek(0)
2167 f.write('bbb')
2168 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002169 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002170
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002171 def test_errors_property(self):
2172 with self.open(support.TESTFN, "w") as f:
2173 self.assertEqual(f.errors, "strict")
2174 with self.open(support.TESTFN, "w", errors="replace") as f:
2175 self.assertEqual(f.errors, "replace")
2176
Antoine Pitroue4501852009-05-14 18:55:55 +00002177
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002178 def test_threads_write(self):
2179 # Issue6750: concurrent writes could duplicate data
2180 event = threading.Event()
2181 with self.open(support.TESTFN, "w", buffering=1) as f:
2182 def run(n):
2183 text = "Thread%03d\n" % n
2184 event.wait()
2185 f.write(text)
2186 threads = [threading.Thread(target=lambda n=x: run(n))
2187 for x in range(20)]
2188 for t in threads:
2189 t.start()
2190 time.sleep(0.02)
2191 event.set()
2192 for t in threads:
2193 t.join()
2194 with self.open(support.TESTFN) as f:
2195 content = f.read()
2196 for n in range(20):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002197 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002198
Antoine Pitroufaf90072010-05-03 16:58:19 +00002199 def test_flush_error_on_close(self):
2200 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2201 def bad_flush():
2202 raise IOError()
2203 txt.flush = bad_flush
2204 self.assertRaises(IOError, txt.close) # exception not swallowed
2205
2206 def test_multi_close(self):
2207 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2208 txt.close()
2209 txt.close()
2210 txt.close()
2211 self.assertRaises(ValueError, txt.flush)
2212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002213class CTextIOWrapperTest(TextIOWrapperTest):
2214
2215 def test_initialization(self):
2216 r = self.BytesIO(b"\xc3\xa9\n\n")
2217 b = self.BufferedReader(r, 1000)
2218 t = self.TextIOWrapper(b)
2219 self.assertRaises(TypeError, t.__init__, b, newline=42)
2220 self.assertRaises(ValueError, t.read)
2221 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2222 self.assertRaises(ValueError, t.read)
2223
2224 def test_garbage_collection(self):
2225 # C TextIOWrapper objects are collected, and collecting them flushes
2226 # all data to disk.
2227 # The Python version has __del__, so it ends in gc.garbage instead.
2228 rawio = io.FileIO(support.TESTFN, "wb")
2229 b = self.BufferedWriter(rawio)
2230 t = self.TextIOWrapper(b, encoding="ascii")
2231 t.write("456def")
2232 t.x = t
2233 wr = weakref.ref(t)
2234 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002235 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002236 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002237 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 self.assertEqual(f.read(), b"456def")
2239
2240class PyTextIOWrapperTest(TextIOWrapperTest):
2241 pass
2242
2243
2244class IncrementalNewlineDecoderTest(unittest.TestCase):
2245
2246 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002247 # UTF-8 specific tests for a newline decoder
2248 def _check_decode(b, s, **kwargs):
2249 # We exercise getstate() / setstate() as well as decode()
2250 state = decoder.getstate()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002251 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002252 decoder.setstate(state)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002253 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002254
Antoine Pitrou180a3362008-12-14 16:36:46 +00002255 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256
Antoine Pitrou180a3362008-12-14 16:36:46 +00002257 _check_decode(b'\xe8', "")
2258 _check_decode(b'\xa2', "")
2259 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002260
Antoine Pitrou180a3362008-12-14 16:36:46 +00002261 _check_decode(b'\xe8', "")
2262 _check_decode(b'\xa2', "")
2263 _check_decode(b'\x88', "\u8888")
2264
2265 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002266 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2267
Antoine Pitrou180a3362008-12-14 16:36:46 +00002268 decoder.reset()
2269 _check_decode(b'\n', "\n")
2270 _check_decode(b'\r', "")
2271 _check_decode(b'', "\n", final=True)
2272 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002273
Antoine Pitrou180a3362008-12-14 16:36:46 +00002274 _check_decode(b'\r', "")
2275 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002276
Antoine Pitrou180a3362008-12-14 16:36:46 +00002277 _check_decode(b'\r\r\n', "\n\n")
2278 _check_decode(b'\r', "")
2279 _check_decode(b'\r', "\n")
2280 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002281
Antoine Pitrou180a3362008-12-14 16:36:46 +00002282 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2283 _check_decode(b'\xe8\xa2\x88', "\u8888")
2284 _check_decode(b'\n', "\n")
2285 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2286 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002289 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 if encoding is not None:
2291 encoder = codecs.getincrementalencoder(encoding)()
2292 def _decode_bytewise(s):
2293 # Decode one byte at a time
2294 for b in encoder.encode(s):
2295 result.append(decoder.decode(bytes([b])))
2296 else:
2297 encoder = None
2298 def _decode_bytewise(s):
2299 # Decode one char at a time
2300 for c in s:
2301 result.append(decoder.decode(c))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002302 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002303 _decode_bytewise("abc\n\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002304 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002305 _decode_bytewise("\nabc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002306 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002307 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002308 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002309 _decode_bytewise("abc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002310 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002311 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002312 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002313 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002314 input = "abc"
2315 if encoder is not None:
2316 encoder.reset()
2317 input = encoder.encode(input)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002318 self.assertEqual(decoder.decode(input), "abc")
2319 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002320
2321 def test_newline_decoder(self):
2322 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 # None meaning the IncrementalNewlineDecoder takes unicode input
2324 # rather than bytes input
2325 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 'utf-16', 'utf-16-le', 'utf-16-be',
2327 'utf-32', 'utf-32-le', 'utf-32-be',
2328 )
2329 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 decoder = enc and codecs.getincrementaldecoder(enc)()
2331 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2332 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002333 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002334 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2335 self.check_newline_decoding_utf8(decoder)
2336
Antoine Pitrou66913e22009-03-06 23:40:56 +00002337 def test_newline_bytes(self):
2338 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2339 def _check(dec):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002340 self.assertEqual(dec.newlines, None)
2341 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2342 self.assertEqual(dec.newlines, None)
2343 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2344 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002345 dec = self.IncrementalNewlineDecoder(None, translate=False)
2346 _check(dec)
2347 dec = self.IncrementalNewlineDecoder(None, translate=True)
2348 _check(dec)
2349
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002350class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2351 pass
2352
2353class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2354 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002355
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002356
Guido van Rossum01a27522007-03-07 01:00:12 +00002357# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002358
Guido van Rossum5abbf752007-08-27 17:39:33 +00002359class MiscIOTest(unittest.TestCase):
2360
Barry Warsaw40e82462008-11-20 20:14:50 +00002361 def tearDown(self):
2362 support.unlink(support.TESTFN)
2363
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364 def test___all__(self):
2365 for name in self.io.__all__:
2366 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002367 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002368 if name == "open":
2369 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002370 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002371 self.assertTrue(issubclass(obj, Exception), name)
2372 elif not name.startswith("SEEK_"):
2373 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002374
Barry Warsaw40e82462008-11-20 20:14:50 +00002375 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002377 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002378 f.close()
2379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 f = self.open(support.TESTFN, "U")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002381 self.assertEqual(f.name, support.TESTFN)
2382 self.assertEqual(f.buffer.name, support.TESTFN)
2383 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2384 self.assertEqual(f.mode, "U")
2385 self.assertEqual(f.buffer.mode, "rb")
2386 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002387 f.close()
2388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 f = self.open(support.TESTFN, "w+")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002390 self.assertEqual(f.mode, "w+")
2391 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2392 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002395 self.assertEqual(g.mode, "wb")
2396 self.assertEqual(g.raw.mode, "wb")
2397 self.assertEqual(g.name, f.fileno())
2398 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002399 f.close()
2400 g.close()
2401
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002402 def test_io_after_close(self):
2403 for kwargs in [
2404 {"mode": "w"},
2405 {"mode": "wb"},
2406 {"mode": "w", "buffering": 1},
2407 {"mode": "w", "buffering": 2},
2408 {"mode": "wb", "buffering": 0},
2409 {"mode": "r"},
2410 {"mode": "rb"},
2411 {"mode": "r", "buffering": 1},
2412 {"mode": "r", "buffering": 2},
2413 {"mode": "rb", "buffering": 0},
2414 {"mode": "w+"},
2415 {"mode": "w+b"},
2416 {"mode": "w+", "buffering": 1},
2417 {"mode": "w+", "buffering": 2},
2418 {"mode": "w+b", "buffering": 0},
2419 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002421 f.close()
2422 self.assertRaises(ValueError, f.flush)
2423 self.assertRaises(ValueError, f.fileno)
2424 self.assertRaises(ValueError, f.isatty)
2425 self.assertRaises(ValueError, f.__iter__)
2426 if hasattr(f, "peek"):
2427 self.assertRaises(ValueError, f.peek, 1)
2428 self.assertRaises(ValueError, f.read)
2429 if hasattr(f, "read1"):
2430 self.assertRaises(ValueError, f.read1, 1024)
2431 if hasattr(f, "readinto"):
2432 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2433 self.assertRaises(ValueError, f.readline)
2434 self.assertRaises(ValueError, f.readlines)
2435 self.assertRaises(ValueError, f.seek, 0)
2436 self.assertRaises(ValueError, f.tell)
2437 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 self.assertRaises(ValueError, f.write,
2439 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002440 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002442
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002443 def test_blockingioerror(self):
2444 # Various BlockingIOError issues
2445 self.assertRaises(TypeError, self.BlockingIOError)
2446 self.assertRaises(TypeError, self.BlockingIOError, 1)
2447 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2448 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2449 b = self.BlockingIOError(1, "")
2450 self.assertEqual(b.characters_written, 0)
2451 class C(str):
2452 pass
2453 c = C("")
2454 b = self.BlockingIOError(1, c)
2455 c.b = b
2456 b.c = c
2457 wr = weakref.ref(c)
2458 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002459 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002460 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002461
2462 def test_abcs(self):
2463 # Test the visible base classes are ABCs.
2464 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2465 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2466 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2467 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2468
2469 def _check_abc_inheritance(self, abcmodule):
2470 with self.open(support.TESTFN, "wb", buffering=0) as f:
2471 self.assertTrue(isinstance(f, abcmodule.IOBase))
2472 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2473 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2474 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2475 with self.open(support.TESTFN, "wb") as f:
2476 self.assertTrue(isinstance(f, abcmodule.IOBase))
2477 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2478 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2479 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2480 with self.open(support.TESTFN, "w") as f:
2481 self.assertTrue(isinstance(f, abcmodule.IOBase))
2482 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2483 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2484 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2485
2486 def test_abc_inheritance(self):
2487 # Test implementations inherit from their respective ABCs
2488 self._check_abc_inheritance(self)
2489
2490 def test_abc_inheritance_official(self):
2491 # Test implementations inherit from the official ABCs of the
2492 # baseline "io" module.
2493 self._check_abc_inheritance(io)
2494
2495class CMiscIOTest(MiscIOTest):
2496 io = io
2497
2498class PyMiscIOTest(MiscIOTest):
2499 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002500
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002501
2502@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2503class SignalsTest(unittest.TestCase):
2504
2505 def setUp(self):
2506 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2507
2508 def tearDown(self):
2509 signal.signal(signal.SIGALRM, self.oldalrm)
2510
2511 def alarm_interrupt(self, sig, frame):
2512 1/0
2513
2514 @unittest.skipUnless(threading, 'Threading required for this test.')
2515 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2516 """Check that a partial write, when it gets interrupted, properly
2517 invokes the signal handler."""
2518 read_results = []
2519 def _read():
2520 s = os.read(r, 1)
2521 read_results.append(s)
2522 t = threading.Thread(target=_read)
2523 t.daemon = True
2524 r, w = os.pipe()
2525 try:
2526 wio = self.io.open(w, **fdopen_kwargs)
2527 t.start()
2528 signal.alarm(1)
2529 # Fill the pipe enough that the write will be blocking.
2530 # It will be interrupted by the timer armed above. Since the
2531 # other thread has read one byte, the low-level write will
2532 # return with a successful (partial) result rather than an EINTR.
2533 # The buffered IO layer must check for pending signal
2534 # handlers, which in this case will invoke alarm_interrupt().
2535 self.assertRaises(ZeroDivisionError,
2536 wio.write, item * (1024 * 1024))
2537 t.join()
2538 # We got one byte, get another one and check that it isn't a
2539 # repeat of the first one.
2540 read_results.append(os.read(r, 1))
2541 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2542 finally:
2543 os.close(w)
2544 os.close(r)
2545 # This is deliberate. If we didn't close the file descriptor
2546 # before closing wio, wio would try to flush its internal
2547 # buffer, and block again.
2548 try:
2549 wio.close()
2550 except IOError as e:
2551 if e.errno != errno.EBADF:
2552 raise
2553
2554 def test_interrupted_write_unbuffered(self):
2555 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2556
2557 def test_interrupted_write_buffered(self):
2558 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2559
2560 def test_interrupted_write_text(self):
2561 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2562
2563class CSignalsTest(SignalsTest):
2564 io = io
2565
2566class PySignalsTest(SignalsTest):
2567 io = pyio
2568
2569
Guido van Rossum28524c72007-02-27 05:47:44 +00002570def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 tests = (CIOTest, PyIOTest,
2572 CBufferedReaderTest, PyBufferedReaderTest,
2573 CBufferedWriterTest, PyBufferedWriterTest,
2574 CBufferedRWPairTest, PyBufferedRWPairTest,
2575 CBufferedRandomTest, PyBufferedRandomTest,
2576 StatefulIncrementalDecoderTest,
2577 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2578 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002579 CMiscIOTest, PyMiscIOTest,
2580 CSignalsTest, PySignalsTest,
2581 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002582
2583 # Put the namespaces of the IO module we are testing and some useful mock
2584 # classes in the __dict__ of each test.
2585 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitroue5e75c62010-09-14 18:53:07 +00002586 MockNonBlockWriterIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002587 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2588 c_io_ns = {name : getattr(io, name) for name in all_members}
2589 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2590 globs = globals()
2591 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2592 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2593 # Avoid turning open into a bound method.
2594 py_io_ns["open"] = pyio.OpenWrapper
2595 for test in tests:
2596 if test.__name__.startswith("C"):
2597 for name, obj in c_io_ns.items():
2598 setattr(test, name, obj)
2599 elif test.__name__.startswith("Py"):
2600 for name, obj in py_io_ns.items():
2601 setattr(test, name, obj)
2602
2603 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002604
2605if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606 test_main()