blob: 5333bb6dfd7e7b834126bc9151040fd90b2f9bcb [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
Antoine Pitroue5e75c62010-09-14 18:53:07 +000050class MockRawIOWithoutRead:
51 """A RawIO implementation without read(), so as to exercise the default
52 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000053
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000054 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000058 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000088 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000106class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
107 pass
108
109class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
110 pass
111
112
113class MockRawIO(MockRawIOWithoutRead):
114
115 def read(self, n=None):
116 self._reads += 1
117 try:
118 return self._read_stack.pop(0)
119 except:
120 self._extraneous_reads += 1
121 return b""
122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123class CMockRawIO(MockRawIO, io.RawIOBase):
124 pass
125
126class PyMockRawIO(MockRawIO, pyio.RawIOBase):
127 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000128
Guido van Rossuma9e20242007-03-08 00:43:48 +0000129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class MisbehavedRawIO(MockRawIO):
131 def write(self, b):
132 return super().write(b) * 2
133
134 def read(self, n=None):
135 return super().read(n) * 2
136
137 def seek(self, pos, whence):
138 return -123
139
140 def tell(self):
141 return -456
142
143 def readinto(self, buf):
144 super().readinto(buf)
145 return len(buf) * 5
146
147class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
148 pass
149
150class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
151 pass
152
153
154class CloseFailureIO(MockRawIO):
155 closed = 0
156
157 def close(self):
158 if not self.closed:
159 self.closed = 1
160 raise IOError
161
162class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
163 pass
164
165class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
166 pass
167
168
169class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000170
171 def __init__(self, data):
172 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177 self.read_history.append(None if res is None else len(res))
178 return res
179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def readinto(self, b):
181 res = super().readinto(b)
182 self.read_history.append(res)
183 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185class CMockFileIO(MockFileIO, io.BytesIO):
186 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class PyMockFileIO(MockFileIO, pyio.BytesIO):
189 pass
190
191
192class MockNonBlockWriterIO:
193
194 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def pop_written(self):
199 s = b"".join(self._write_stack)
200 self._write_stack[:] = []
201 return s
202
203 def block_on(self, char):
204 """Block when a given char is encountered."""
205 self._blocker_char = char
206
207 def readable(self):
208 return True
209
210 def seekable(self):
211 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000212
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 def writable(self):
214 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def write(self, b):
217 b = bytes(b)
218 n = -1
219 if self._blocker_char:
220 try:
221 n = b.index(self._blocker_char)
222 except ValueError:
223 pass
224 else:
225 self._blocker_char = None
226 self._write_stack.append(b[:n])
227 raise self.BlockingIOError(0, "test blocking", n)
228 self._write_stack.append(b)
229 return len(b)
230
231class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
232 BlockingIOError = io.BlockingIOError
233
234class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
235 BlockingIOError = pyio.BlockingIOError
236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum28524c72007-02-27 05:47:44 +0000238class IOTest(unittest.TestCase):
239
Neal Norwitze7789b12008-03-24 06:18:09 +0000240 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000241 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000242
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000243 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000244 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000245
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000248 f.truncate(0)
249 self.assertEqual(f.tell(), 5)
250 f.seek(0)
251
252 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.seek(0), 0)
254 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000259 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000260 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000261 self.assertEqual(f.seek(-1, 2), 13)
262 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000263
Guido van Rossum87429772007-04-10 21:06:59 +0000264 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000265 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000267
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 def read_ops(self, f, buffered=False):
269 data = f.read(5)
270 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.readinto(data), 5)
273 self.assertEqual(data, b" worl")
274 self.assertEqual(f.readinto(data), 2)
275 self.assertEqual(len(data), 5)
276 self.assertEqual(data[:2], b"d\n")
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.read(20), b"hello world\n")
279 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000280 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000281 self.assertEqual(f.seek(-6, 2), 6)
282 self.assertEqual(f.read(5), b"world")
283 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000285 self.assertEqual(f.seek(-6, 1), 5)
286 self.assertEqual(f.read(5), b" worl")
287 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000288 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000289 if buffered:
290 f.seek(0)
291 self.assertEqual(f.read(), b"hello world\n")
292 f.seek(6)
293 self.assertEqual(f.read(), b"world\n")
294 self.assertEqual(f.read(), b"")
295
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 LARGE = 2**31
297
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 def large_file_ops(self, f):
299 assert f.readable()
300 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(self.LARGE), self.LARGE)
302 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000304 self.assertEqual(f.tell(), self.LARGE + 3)
305 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000307 self.assertEqual(f.tell(), self.LARGE + 2)
308 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000310 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
312 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000313 self.assertEqual(f.read(2), b"x")
314
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 def test_invalid_operations(self):
316 # Try writing on a file opened in read mode and vice-versa.
317 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000318 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000319 self.assertRaises(IOError, fp.read)
320 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000321 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 self.assertRaises(IOError, fp.write, b"blah")
323 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000324 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 self.assertRaises(IOError, fp.write, "blah")
326 self.assertRaises(IOError, fp.writelines, ["blah\n"])
327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb", buffering=0) as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb", buffering=0) as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339
Guido van Rossum87429772007-04-10 21:06:59 +0000340 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 self.assertEqual(f.readable(), False)
343 self.assertEqual(f.writable(), True)
344 self.assertEqual(f.seekable(), True)
345 self.write_ops(f)
346 with self.open(support.TESTFN, "rb") as f:
347 self.assertEqual(f.readable(), True)
348 self.assertEqual(f.writable(), False)
349 self.assertEqual(f.seekable(), True)
350 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000351
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000353 with self.open(support.TESTFN, "wb") as f:
354 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
355 with self.open(support.TESTFN, "rb") as f:
356 self.assertEqual(f.readline(), b"abc\n")
357 self.assertEqual(f.readline(10), b"def\n")
358 self.assertEqual(f.readline(2), b"xy")
359 self.assertEqual(f.readline(4), b"zzy\n")
360 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000361 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000362 self.assertRaises(TypeError, f.readline, 5.3)
363 with self.open(support.TESTFN, "r") as f:
364 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000367 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000368 self.write_ops(f)
369 data = f.getvalue()
370 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000372 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000373
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 # On Windows and Mac OSX this test comsumes large resources; It takes
376 # a long time to build the >2GB file and takes >2GB of disk space
377 # therefore the resource must be enabled to run this test.
378 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000379 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 print("\nTesting large file ops skipped on %s." % sys.platform,
381 file=sys.stderr)
382 print("It requires %d bytes and a long time." % self.LARGE,
383 file=sys.stderr)
384 print("Use 'regrtest.py -u largefile test_io' to run it.",
385 file=sys.stderr)
386 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "w+b", 0) as f:
388 self.large_file_ops(f)
389 with self.open(support.TESTFN, "w+b") as f:
390 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000391
392 def test_with_open(self):
393 for bufsize in (0, 1, 100):
394 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000396 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.closed, True)
398 f = None
399 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000401 1/0
402 except ZeroDivisionError:
403 self.assertEqual(f.closed, True)
404 else:
405 self.fail("1/0 didn't raise an exception")
406
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407 # issue 5008
408 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000410 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000412 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000414 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000416 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000417
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def test_destructor(self):
419 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def __del__(self):
422 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000429 def close(self):
430 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000432 def flush(self):
433 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000434 super().flush()
435 f = MyFileIO(support.TESTFN, "wb")
436 f.write(b"xxx")
437 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000438 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000441 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442
443 def _check_base_destructor(self, base):
444 record = []
445 class MyIO(base):
446 def __init__(self):
447 # This exercises the availability of attributes on object
448 # destruction.
449 # (in the C version, close() is called by the tp_dealloc
450 # function, not by __del__)
451 self.on_del = 1
452 self.on_close = 2
453 self.on_flush = 3
454 def __del__(self):
455 record.append(self.on_del)
456 try:
457 f = super().__del__
458 except AttributeError:
459 pass
460 else:
461 f()
462 def close(self):
463 record.append(self.on_close)
464 super().close()
465 def flush(self):
466 record.append(self.on_flush)
467 super().flush()
468 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000470 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 self.assertEqual(record, [1, 2, 3])
472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_IOBase_destructor(self):
474 self._check_base_destructor(self.IOBase)
475
476 def test_RawIOBase_destructor(self):
477 self._check_base_destructor(self.RawIOBase)
478
479 def test_BufferedIOBase_destructor(self):
480 self._check_base_destructor(self.BufferedIOBase)
481
482 def test_TextIOBase_destructor(self):
483 self._check_base_destructor(self.TextIOBase)
484
Guido van Rossum87429772007-04-10 21:06:59 +0000485 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000486 with self.open(support.TESTFN, "wb") as f:
487 f.write(b"xxx")
488 with self.open(support.TESTFN, "rb") as f:
489 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Guido van Rossumd4103952007-04-12 05:44:49 +0000491 def test_array_writes(self):
492 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000493 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000494 with self.open(support.TESTFN, "wb", 0) as f:
495 self.assertEqual(f.write(a), n)
496 with self.open(support.TESTFN, "wb") as f:
497 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000498
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000499 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000501 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_read_closed(self):
504 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "r") as f:
507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.read(), "egg\n")
509 file.seek(0)
510 file.close()
511 self.assertRaises(ValueError, file.read)
512
513 def test_no_closefd_with_filename(self):
514 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000516
517 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000519 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000521 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000523 self.assertEqual(file.buffer.raw.closefd, False)
524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 def test_garbage_collection(self):
526 # FileIO objects are collected, and collecting them flushes
527 # all data to disk.
528 f = self.FileIO(support.TESTFN, "wb")
529 f.write(b"abcxxx")
530 f.f = f
531 wr = weakref.ref(f)
532 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000533 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000534 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000537
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 def test_unbounded_file(self):
539 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
540 zero = "/dev/zero"
541 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000542 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000543 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000544 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000545 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000546 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000547 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000548 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000550 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000551 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000552 self.assertRaises(OverflowError, f.read)
553
Antoine Pitroufaf90072010-05-03 16:58:19 +0000554 def test_flush_error_on_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 def bad_flush():
557 raise IOError()
558 f.flush = bad_flush
559 self.assertRaises(IOError, f.close) # exception not swallowed
560
561 def test_multi_close(self):
562 f = self.open(support.TESTFN, "wb", buffering=0)
563 f.close()
564 f.close()
565 f.close()
566 self.assertRaises(ValueError, f.flush)
567
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000568 def test_RawIOBase_read(self):
569 # Exercise the default RawIOBase.read() implementation (which calls
570 # readinto() internally).
571 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
572 self.assertEqual(rawio.read(2), b"ab")
573 self.assertEqual(rawio.read(2), b"c")
574 self.assertEqual(rawio.read(2), b"d")
575 self.assertEqual(rawio.read(2), None)
576 self.assertEqual(rawio.read(2), b"ef")
577 self.assertEqual(rawio.read(2), b"g")
578 self.assertEqual(rawio.read(2), None)
579 self.assertEqual(rawio.read(2), b"")
580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581class CIOTest(IOTest):
582 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584class PyIOTest(IOTest):
585 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000586
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class CommonBufferedTests:
589 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
590
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000591 def test_detach(self):
592 raw = self.MockRawIO()
593 buf = self.tp(raw)
594 self.assertIs(buf.detach(), raw)
595 self.assertRaises(ValueError, buf.detach)
596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 def test_fileno(self):
598 rawio = self.MockRawIO()
599 bufio = self.tp(rawio)
600
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000601 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602
603 def test_no_fileno(self):
604 # XXX will we always have fileno() function? If so, kill
605 # this test. Else, write it.
606 pass
607
608 def test_invalid_args(self):
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 # Invalid whence
612 self.assertRaises(ValueError, bufio.seek, 0, -1)
613 self.assertRaises(ValueError, bufio.seek, 0, 3)
614
615 def test_override_destructor(self):
616 tp = self.tp
617 record = []
618 class MyBufferedIO(tp):
619 def __del__(self):
620 record.append(1)
621 try:
622 f = super().__del__
623 except AttributeError:
624 pass
625 else:
626 f()
627 def close(self):
628 record.append(2)
629 super().close()
630 def flush(self):
631 record.append(3)
632 super().flush()
633 rawio = self.MockRawIO()
634 bufio = MyBufferedIO(rawio)
635 writable = bufio.writable()
636 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000637 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 if writable:
639 self.assertEqual(record, [1, 2, 3])
640 else:
641 self.assertEqual(record, [1, 2])
642
643 def test_context_manager(self):
644 # Test usability as a context manager
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647 def _with():
648 with bufio:
649 pass
650 _with()
651 # bufio should now be closed, and using it a second time should raise
652 # a ValueError.
653 self.assertRaises(ValueError, _with)
654
655 def test_error_through_destructor(self):
656 # Test that the exception state is not modified by a destructor,
657 # even if close() fails.
658 rawio = self.CloseFailureIO()
659 def f():
660 self.tp(rawio).xyzzy
661 with support.captured_output("stderr") as s:
662 self.assertRaises(AttributeError, f)
663 s = s.getvalue().strip()
664 if s:
665 # The destructor *may* have printed an unraisable error, check it
666 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000667 self.assertTrue(s.startswith("Exception IOError: "), s)
668 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000669
Antoine Pitrou716c4442009-05-23 19:04:03 +0000670 def test_repr(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
674 self.assertEqual(repr(b), "<%s>" % clsname)
675 raw.name = "dummy"
676 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
677 raw.name = b"dummy"
678 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
679
Antoine Pitroufaf90072010-05-03 16:58:19 +0000680 def test_flush_error_on_close(self):
681 raw = self.MockRawIO()
682 def bad_flush():
683 raise IOError()
684 raw.flush = bad_flush
685 b = self.tp(raw)
686 self.assertRaises(IOError, b.close) # exception not swallowed
687
688 def test_multi_close(self):
689 raw = self.MockRawIO()
690 b = self.tp(raw)
691 b.close()
692 b.close()
693 b.close()
694 self.assertRaises(ValueError, b.flush)
695
Antoine Pitrou6cfc5122010-12-21 21:26:09 +0000696 def test_readonly_attributes(self):
697 raw = self.MockRawIO()
698 buf = self.tp(raw)
699 x = self.MockRawIO()
700 with self.assertRaises(AttributeError):
701 buf.raw = x
702
Guido van Rossum78892e42007-04-06 17:31:18 +0000703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
705 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_constructor(self):
708 rawio = self.MockRawIO([b"abc"])
709 bufio = self.tp(rawio)
710 bufio.__init__(rawio)
711 bufio.__init__(rawio, buffer_size=1024)
712 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000713 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
715 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
717 rawio = self.MockRawIO([b"abc"])
718 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000719 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000722 for arg in (None, 7):
723 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
724 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000725 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 # Invalid args
727 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read1(self):
730 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
731 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000732 self.assertEqual(b"a", bufio.read(1))
733 self.assertEqual(b"b", bufio.read1(1))
734 self.assertEqual(rawio._reads, 1)
735 self.assertEqual(b"c", bufio.read1(100))
736 self.assertEqual(rawio._reads, 1)
737 self.assertEqual(b"d", bufio.read1(100))
738 self.assertEqual(rawio._reads, 2)
739 self.assertEqual(b"efg", bufio.read1(100))
740 self.assertEqual(rawio._reads, 3)
741 self.assertEqual(b"", bufio.read1(100))
742 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 # Invalid args
744 self.assertRaises(ValueError, bufio.read1, -1)
745
746 def test_readinto(self):
747 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
748 bufio = self.tp(rawio)
749 b = bytearray(2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000750 self.assertEqual(bufio.readinto(b), 2)
751 self.assertEqual(b, b"ab")
752 self.assertEqual(bufio.readinto(b), 2)
753 self.assertEqual(b, b"cd")
754 self.assertEqual(bufio.readinto(b), 2)
755 self.assertEqual(b, b"ef")
756 self.assertEqual(bufio.readinto(b), 1)
757 self.assertEqual(b, b"gf")
758 self.assertEqual(bufio.readinto(b), 0)
759 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000760
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000761 def test_readlines(self):
762 def bufio():
763 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
764 return self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000765 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
766 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
767 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000770 data = b"abcdefghi"
771 dlen = len(data)
772
773 tests = [
774 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
775 [ 100, [ 3, 3, 3], [ dlen ] ],
776 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
777 ]
778
779 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 rawio = self.MockFileIO(data)
781 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000782 pos = 0
783 for nbytes in buf_read_sizes:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000784 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000785 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 # this is mildly implementation-dependent
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000787 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000790 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
792 bufio = self.tp(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000793 self.assertEqual(b"abcd", bufio.read(6))
794 self.assertEqual(b"e", bufio.read(1))
795 self.assertEqual(b"fg", bufio.read())
796 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200797 self.assertIsNone(bufio.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000798 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000799
Victor Stinnera80987f2011-05-25 22:47:16 +0200800 rawio = self.MockRawIO((b"a", None, None))
801 self.assertEqual(b"a", rawio.readall())
802 self.assertIsNone(rawio.readall())
803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000804 def test_read_past_eof(self):
805 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
806 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000807
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000808 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000809
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000810 def test_read_all(self):
811 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
812 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000813
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000814 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000815
Antoine Pitrou27314942010-10-14 15:41:23 +0000816 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000818 try:
819 # Write out many bytes with exactly the same number of 0's,
820 # 1's... 255's. This will help us check that concurrent reading
821 # doesn't duplicate or forget contents.
822 N = 1000
823 l = list(range(256)) * N
824 random.shuffle(l)
825 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000826 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000827 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000828 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000830 errors = []
831 results = []
832 def f():
833 try:
834 # Intra-buffer read then buffer-flushing read
835 for n in cycle([1, 19]):
836 s = bufio.read(n)
837 if not s:
838 break
839 # list.append() is atomic
840 results.append(s)
841 except Exception as e:
842 errors.append(e)
843 raise
844 threads = [threading.Thread(target=f) for x in range(20)]
845 for t in threads:
846 t.start()
847 time.sleep(0.02) # yield
848 for t in threads:
849 t.join()
850 self.assertFalse(errors,
851 "the following exceptions were caught: %r" % errors)
852 s = b''.join(results)
853 for i in range(256):
854 c = bytes(bytearray([i]))
855 self.assertEqual(s.count(c), N)
856 finally:
857 support.unlink(support.TESTFN)
858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_misbehaved_io(self):
860 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
861 bufio = self.tp(rawio)
862 self.assertRaises(IOError, bufio.seek, 0)
863 self.assertRaises(IOError, bufio.tell)
864
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000865 def test_no_extraneous_read(self):
866 # Issue #9550; when the raw IO object has satisfied the read request,
867 # we should not issue any additional reads, otherwise it may block
868 # (e.g. socket).
869 bufsize = 16
870 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
871 rawio = self.MockRawIO([b"x" * n])
872 bufio = self.tp(rawio, bufsize)
873 self.assertEqual(bufio.read(n), b"x" * n)
874 # Simple case: one raw read is enough to satisfy the request.
875 self.assertEqual(rawio._extraneous_reads, 0,
876 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
877 # A more complex case where two raw reads are needed to satisfy
878 # the request.
879 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
880 bufio = self.tp(rawio, bufsize)
881 self.assertEqual(bufio.read(n), b"x" * n)
882 self.assertEqual(rawio._extraneous_reads, 0,
883 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
884
885
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886class CBufferedReaderTest(BufferedReaderTest):
887 tp = io.BufferedReader
888
889 def test_constructor(self):
890 BufferedReaderTest.test_constructor(self)
891 # The allocation can succeed on 32-bit builds, e.g. with more
892 # than 2GB RAM and a 64-bit kernel.
893 if sys.maxsize > 0x7FFFFFFF:
894 rawio = self.MockRawIO()
895 bufio = self.tp(rawio)
896 self.assertRaises((OverflowError, MemoryError, ValueError),
897 bufio.__init__, rawio, sys.maxsize)
898
899 def test_initialization(self):
900 rawio = self.MockRawIO([b"abc"])
901 bufio = self.tp(rawio)
902 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
903 self.assertRaises(ValueError, bufio.read)
904 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
905 self.assertRaises(ValueError, bufio.read)
906 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
907 self.assertRaises(ValueError, bufio.read)
908
909 def test_misbehaved_io_read(self):
910 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
911 bufio = self.tp(rawio)
912 # _pyio.BufferedReader seems to implement reading different, so that
913 # checking this is not so easy.
914 self.assertRaises(IOError, bufio.read, 10)
915
916 def test_garbage_collection(self):
917 # C BufferedReader objects are collected.
918 # The Python version has __del__, so it ends into gc.garbage instead
919 rawio = self.FileIO(support.TESTFN, "w+b")
920 f = self.tp(rawio)
921 f.f = f
922 wr = weakref.ref(f)
923 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000924 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000925 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926
927class PyBufferedReaderTest(BufferedReaderTest):
928 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000929
Guido van Rossuma9e20242007-03-08 00:43:48 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
932 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000934 def test_constructor(self):
935 rawio = self.MockRawIO()
936 bufio = self.tp(rawio)
937 bufio.__init__(rawio)
938 bufio.__init__(rawio, buffer_size=1024)
939 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000940 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941 bufio.flush()
942 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
943 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
944 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
945 bufio.__init__(rawio)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000946 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000948 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000950 def test_detach_flush(self):
951 raw = self.MockRawIO()
952 buf = self.tp(raw)
953 buf.write(b"howdy!")
954 self.assertFalse(raw._write_stack)
955 buf.detach()
956 self.assertEqual(raw._write_stack, [b"howdy!"])
957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000959 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 writer = self.MockRawIO()
961 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000962 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000963 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_write_overflow(self):
966 writer = self.MockRawIO()
967 bufio = self.tp(writer, 8)
968 contents = b"abcdefghijklmnop"
969 for n in range(0, len(contents), 3):
970 bufio.write(contents[n:n+3])
971 flushed = b"".join(writer._write_stack)
972 # At least (total - 8) bytes were implicitly flushed, perhaps more
973 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000974 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 def check_writes(self, intermediate_func):
977 # Lots of writes, test the flushed output is as expected.
978 contents = bytes(range(256)) * 1000
979 n = 0
980 writer = self.MockRawIO()
981 bufio = self.tp(writer, 13)
982 # Generator of write sizes: repeat each N 15 times then proceed to N+1
983 def gen_sizes():
984 for size in count(1):
985 for i in range(15):
986 yield size
987 sizes = gen_sizes()
988 while n < len(contents):
989 size = min(next(sizes), len(contents) - n)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000990 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 intermediate_func(bufio)
992 n += size
993 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000994 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_writes(self):
997 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000999 def test_writes_and_flushes(self):
1000 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 def test_writes_and_seeks(self):
1003 def _seekabs(bufio):
1004 pos = bufio.tell()
1005 bufio.seek(pos + 1, 0)
1006 bufio.seek(pos - 1, 0)
1007 bufio.seek(pos, 0)
1008 self.check_writes(_seekabs)
1009 def _seekrel(bufio):
1010 pos = bufio.seek(0, 1)
1011 bufio.seek(+1, 1)
1012 bufio.seek(-1, 1)
1013 bufio.seek(pos, 0)
1014 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_writes_and_truncates(self):
1017 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_write_non_blocking(self):
1020 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001021 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001022
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001023 self.assertEqual(bufio.write(b"abcd"), 4)
1024 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 # 1 byte will be written, the rest will be buffered
1026 raw.block_on(b"k")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001027 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1030 raw.block_on(b"0")
1031 try:
1032 bufio.write(b"opqrwxyz0123456789")
1033 except self.BlockingIOError as e:
1034 written = e.characters_written
1035 else:
1036 self.fail("BlockingIOError should have been raised")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001037 self.assertEqual(written, 16)
1038 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001040
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001041 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 s = raw.pop_written()
1043 # Previously buffered bytes were flushed
1044 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 def test_write_and_rewind(self):
1047 raw = io.BytesIO()
1048 bufio = self.tp(raw, 4)
1049 self.assertEqual(bufio.write(b"abcdef"), 6)
1050 self.assertEqual(bufio.tell(), 6)
1051 bufio.seek(0, 0)
1052 self.assertEqual(bufio.write(b"XY"), 2)
1053 bufio.seek(6, 0)
1054 self.assertEqual(raw.getvalue(), b"XYcdef")
1055 self.assertEqual(bufio.write(b"123456"), 6)
1056 bufio.flush()
1057 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059 def test_flush(self):
1060 writer = self.MockRawIO()
1061 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001062 bufio.write(b"abc")
1063 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001064 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 def test_destructor(self):
1067 writer = self.MockRawIO()
1068 bufio = self.tp(writer, 8)
1069 bufio.write(b"abc")
1070 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001071 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001072 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073
1074 def test_truncate(self):
1075 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001076 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 bufio = self.tp(raw, 8)
1078 bufio.write(b"abcdef")
1079 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001080 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001081 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 self.assertEqual(f.read(), b"abc")
1083
Antoine Pitrou27314942010-10-14 15:41:23 +00001084 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001086 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087 # Write out many bytes from many threads and test they were
1088 # all flushed.
1089 N = 1000
1090 contents = bytes(range(256)) * N
1091 sizes = cycle([1, 19])
1092 n = 0
1093 queue = deque()
1094 while n < len(contents):
1095 size = next(sizes)
1096 queue.append(contents[n:n+size])
1097 n += size
1098 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001099 # We use a real file object because it allows us to
1100 # exercise situations where the GIL is released before
1101 # writing the buffer to the raw streams. This is in addition
1102 # to concurrency issues due to switching threads in the middle
1103 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001104 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001106 errors = []
1107 def f():
1108 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109 while True:
1110 try:
1111 s = queue.popleft()
1112 except IndexError:
1113 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001114 bufio.write(s)
1115 except Exception as e:
1116 errors.append(e)
1117 raise
1118 threads = [threading.Thread(target=f) for x in range(20)]
1119 for t in threads:
1120 t.start()
1121 time.sleep(0.02) # yield
1122 for t in threads:
1123 t.join()
1124 self.assertFalse(errors,
1125 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001127 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 s = f.read()
1129 for i in range(256):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001130 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001131 finally:
1132 support.unlink(support.TESTFN)
1133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 def test_misbehaved_io(self):
1135 rawio = self.MisbehavedRawIO()
1136 bufio = self.tp(rawio, 5)
1137 self.assertRaises(IOError, bufio.seek, 0)
1138 self.assertRaises(IOError, bufio.tell)
1139 self.assertRaises(IOError, bufio.write, b"abcdef")
1140
Benjamin Peterson59406a92009-03-26 17:10:29 +00001141 def test_max_buffer_size_deprecation(self):
1142 with support.check_warnings() as w:
1143 warnings.simplefilter("always", DeprecationWarning)
1144 self.tp(self.MockRawIO(), 8, 12)
1145 self.assertEqual(len(w.warnings), 1)
1146 warning = w.warnings[0]
1147 self.assertTrue(warning.category is DeprecationWarning)
1148 self.assertEqual(str(warning.message),
1149 "max_buffer_size is deprecated")
1150
1151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152class CBufferedWriterTest(BufferedWriterTest):
1153 tp = io.BufferedWriter
1154
1155 def test_constructor(self):
1156 BufferedWriterTest.test_constructor(self)
1157 # The allocation can succeed on 32-bit builds, e.g. with more
1158 # than 2GB RAM and a 64-bit kernel.
1159 if sys.maxsize > 0x7FFFFFFF:
1160 rawio = self.MockRawIO()
1161 bufio = self.tp(rawio)
1162 self.assertRaises((OverflowError, MemoryError, ValueError),
1163 bufio.__init__, rawio, sys.maxsize)
1164
1165 def test_initialization(self):
1166 rawio = self.MockRawIO()
1167 bufio = self.tp(rawio)
1168 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1169 self.assertRaises(ValueError, bufio.write, b"def")
1170 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1171 self.assertRaises(ValueError, bufio.write, b"def")
1172 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1173 self.assertRaises(ValueError, bufio.write, b"def")
1174
1175 def test_garbage_collection(self):
1176 # C BufferedWriter objects are collected, and collecting them flushes
1177 # all data to disk.
1178 # The Python version has __del__, so it ends into gc.garbage instead
1179 rawio = self.FileIO(support.TESTFN, "w+b")
1180 f = self.tp(rawio)
1181 f.write(b"123xxx")
1182 f.x = f
1183 wr = weakref.ref(f)
1184 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001185 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001186 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001187 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 self.assertEqual(f.read(), b"123xxx")
1189
1190
1191class PyBufferedWriterTest(BufferedWriterTest):
1192 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001193
Guido van Rossum01a27522007-03-07 01:00:12 +00001194class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001195
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001196 def test_constructor(self):
1197 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001198 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001199
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001200 def test_detach(self):
1201 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1202 self.assertRaises(self.UnsupportedOperation, pair.detach)
1203
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001204 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001205 with support.check_warnings() as w:
1206 warnings.simplefilter("always", DeprecationWarning)
1207 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1208 self.assertEqual(len(w.warnings), 1)
1209 warning = w.warnings[0]
1210 self.assertTrue(warning.category is DeprecationWarning)
1211 self.assertEqual(str(warning.message),
1212 "max_buffer_size is deprecated")
1213
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001214 def test_constructor_with_not_readable(self):
1215 class NotReadable(MockRawIO):
1216 def readable(self):
1217 return False
1218
1219 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1220
1221 def test_constructor_with_not_writeable(self):
1222 class NotWriteable(MockRawIO):
1223 def writable(self):
1224 return False
1225
1226 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1227
1228 def test_read(self):
1229 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1230
1231 self.assertEqual(pair.read(3), b"abc")
1232 self.assertEqual(pair.read(1), b"d")
1233 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001234 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1235 self.assertEqual(pair.read(None), b"abc")
1236
1237 def test_readlines(self):
1238 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1239 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1240 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1241 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001242
1243 def test_read1(self):
1244 # .read1() is delegated to the underlying reader object, so this test
1245 # can be shallow.
1246 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1247
1248 self.assertEqual(pair.read1(3), b"abc")
1249
1250 def test_readinto(self):
1251 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1252
1253 data = bytearray(5)
1254 self.assertEqual(pair.readinto(data), 5)
1255 self.assertEqual(data, b"abcde")
1256
1257 def test_write(self):
1258 w = self.MockRawIO()
1259 pair = self.tp(self.MockRawIO(), w)
1260
1261 pair.write(b"abc")
1262 pair.flush()
1263 pair.write(b"def")
1264 pair.flush()
1265 self.assertEqual(w._write_stack, [b"abc", b"def"])
1266
1267 def test_peek(self):
1268 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1269
1270 self.assertTrue(pair.peek(3).startswith(b"abc"))
1271 self.assertEqual(pair.read(3), b"abc")
1272
1273 def test_readable(self):
1274 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1275 self.assertTrue(pair.readable())
1276
1277 def test_writeable(self):
1278 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1279 self.assertTrue(pair.writable())
1280
1281 def test_seekable(self):
1282 # BufferedRWPairs are never seekable, even if their readers and writers
1283 # are.
1284 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1285 self.assertFalse(pair.seekable())
1286
1287 # .flush() is delegated to the underlying writer object and has been
1288 # tested in the test_write method.
1289
1290 def test_close_and_closed(self):
1291 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1292 self.assertFalse(pair.closed)
1293 pair.close()
1294 self.assertTrue(pair.closed)
1295
1296 def test_isatty(self):
1297 class SelectableIsAtty(MockRawIO):
1298 def __init__(self, isatty):
1299 MockRawIO.__init__(self)
1300 self._isatty = isatty
1301
1302 def isatty(self):
1303 return self._isatty
1304
1305 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1306 self.assertFalse(pair.isatty())
1307
1308 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1309 self.assertTrue(pair.isatty())
1310
1311 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1312 self.assertTrue(pair.isatty())
1313
1314 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1315 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317class CBufferedRWPairTest(BufferedRWPairTest):
1318 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320class PyBufferedRWPairTest(BufferedRWPairTest):
1321 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001323
1324class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1325 read_mode = "rb+"
1326 write_mode = "wb+"
1327
1328 def test_constructor(self):
1329 BufferedReaderTest.test_constructor(self)
1330 BufferedWriterTest.test_constructor(self)
1331
1332 def test_read_and_write(self):
1333 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001334 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001335
1336 self.assertEqual(b"as", rw.read(2))
1337 rw.write(b"ddd")
1338 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001339 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001341 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343 def test_seek_and_tell(self):
1344 raw = self.BytesIO(b"asdfghjkl")
1345 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001346
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001347 self.assertEqual(b"as", rw.read(2))
1348 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001349 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001350 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001351
1352 rw.write(b"asdf")
1353 rw.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001354 self.assertEqual(b"asdfasdfl", rw.read())
1355 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001356 rw.seek(-4, 2)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001357 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001358 rw.seek(2, 1)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001359 self.assertEqual(7, rw.tell())
1360 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001361 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 def check_flush_and_read(self, read_func):
1364 raw = self.BytesIO(b"abcdefghi")
1365 bufio = self.tp(raw)
1366
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001367 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368 bufio.write(b"12")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001369 self.assertEqual(b"ef", read_func(bufio, 2))
1370 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 bufio.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001372 self.assertEqual(6, bufio.tell())
1373 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001374 raw.seek(0, 0)
1375 raw.write(b"XYZ")
1376 # flush() resets the read buffer
1377 bufio.flush()
1378 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001379 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380
1381 def test_flush_and_read(self):
1382 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1383
1384 def test_flush_and_readinto(self):
1385 def _readinto(bufio, n=-1):
1386 b = bytearray(n if n >= 0 else 9999)
1387 n = bufio.readinto(b)
1388 return bytes(b[:n])
1389 self.check_flush_and_read(_readinto)
1390
1391 def test_flush_and_peek(self):
1392 def _peek(bufio, n=-1):
1393 # This relies on the fact that the buffer can contain the whole
1394 # raw stream, otherwise peek() can return less.
1395 b = bufio.peek(n)
1396 if n != -1:
1397 b = b[:n]
1398 bufio.seek(len(b), 1)
1399 return b
1400 self.check_flush_and_read(_peek)
1401
1402 def test_flush_and_write(self):
1403 raw = self.BytesIO(b"abcdefghi")
1404 bufio = self.tp(raw)
1405
1406 bufio.write(b"123")
1407 bufio.flush()
1408 bufio.write(b"45")
1409 bufio.flush()
1410 bufio.seek(0, 0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001411 self.assertEqual(b"12345fghi", raw.getvalue())
1412 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413
1414 def test_threads(self):
1415 BufferedReaderTest.test_threads(self)
1416 BufferedWriterTest.test_threads(self)
1417
1418 def test_writes_and_peek(self):
1419 def _peek(bufio):
1420 bufio.peek(1)
1421 self.check_writes(_peek)
1422 def _peek(bufio):
1423 pos = bufio.tell()
1424 bufio.seek(-1, 1)
1425 bufio.peek(1)
1426 bufio.seek(pos, 0)
1427 self.check_writes(_peek)
1428
1429 def test_writes_and_reads(self):
1430 def _read(bufio):
1431 bufio.seek(-1, 1)
1432 bufio.read(1)
1433 self.check_writes(_read)
1434
1435 def test_writes_and_read1s(self):
1436 def _read1(bufio):
1437 bufio.seek(-1, 1)
1438 bufio.read1(1)
1439 self.check_writes(_read1)
1440
1441 def test_writes_and_readintos(self):
1442 def _read(bufio):
1443 bufio.seek(-1, 1)
1444 bufio.readinto(bytearray(1))
1445 self.check_writes(_read)
1446
Antoine Pitrou0473e562009-08-06 20:52:43 +00001447 def test_write_after_readahead(self):
1448 # Issue #6629: writing after the buffer was filled by readahead should
1449 # first rewind the raw stream.
1450 for overwrite_size in [1, 5]:
1451 raw = self.BytesIO(b"A" * 10)
1452 bufio = self.tp(raw, 4)
1453 # Trigger readahead
1454 self.assertEqual(bufio.read(1), b"A")
1455 self.assertEqual(bufio.tell(), 1)
1456 # Overwriting should rewind the raw stream if it needs so
1457 bufio.write(b"B" * overwrite_size)
1458 self.assertEqual(bufio.tell(), overwrite_size + 1)
1459 # If the write size was smaller than the buffer size, flush() and
1460 # check that rewind happens.
1461 bufio.flush()
1462 self.assertEqual(bufio.tell(), overwrite_size + 1)
1463 s = raw.getvalue()
1464 self.assertEqual(s,
1465 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1466
Antoine Pitrou7c404892011-05-13 00:13:33 +02001467 def test_write_rewind_write(self):
1468 # Various combinations of reading / writing / seeking backwards / writing again
1469 def mutate(bufio, pos1, pos2):
1470 assert pos2 >= pos1
1471 # Fill the buffer
1472 bufio.seek(pos1)
1473 bufio.read(pos2 - pos1)
1474 bufio.write(b'\x02')
1475 # This writes earlier than the previous write, but still inside
1476 # the buffer.
1477 bufio.seek(pos1)
1478 bufio.write(b'\x01')
1479
1480 b = b"\x80\x81\x82\x83\x84"
1481 for i in range(0, len(b)):
1482 for j in range(i, len(b)):
1483 raw = self.BytesIO(b)
1484 bufio = self.tp(raw, 100)
1485 mutate(bufio, i, j)
1486 bufio.flush()
1487 expected = bytearray(b)
1488 expected[j] = 2
1489 expected[i] = 1
1490 self.assertEqual(raw.getvalue(), expected,
1491 "failed result for i=%d, j=%d" % (i, j))
1492
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001493 def test_truncate_after_read_or_write(self):
1494 raw = self.BytesIO(b"A" * 10)
1495 bufio = self.tp(raw, 100)
1496 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1497 self.assertEqual(bufio.truncate(), 2)
1498 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1499 self.assertEqual(bufio.truncate(), 4)
1500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001501 def test_misbehaved_io(self):
1502 BufferedReaderTest.test_misbehaved_io(self)
1503 BufferedWriterTest.test_misbehaved_io(self)
1504
1505class CBufferedRandomTest(BufferedRandomTest):
1506 tp = io.BufferedRandom
1507
1508 def test_constructor(self):
1509 BufferedRandomTest.test_constructor(self)
1510 # The allocation can succeed on 32-bit builds, e.g. with more
1511 # than 2GB RAM and a 64-bit kernel.
1512 if sys.maxsize > 0x7FFFFFFF:
1513 rawio = self.MockRawIO()
1514 bufio = self.tp(rawio)
1515 self.assertRaises((OverflowError, MemoryError, ValueError),
1516 bufio.__init__, rawio, sys.maxsize)
1517
1518 def test_garbage_collection(self):
1519 CBufferedReaderTest.test_garbage_collection(self)
1520 CBufferedWriterTest.test_garbage_collection(self)
1521
1522class PyBufferedRandomTest(BufferedRandomTest):
1523 tp = pyio.BufferedRandom
1524
1525
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001526# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1527# properties:
1528# - A single output character can correspond to many bytes of input.
1529# - The number of input bytes to complete the character can be
1530# undetermined until the last input byte is received.
1531# - The number of input bytes can vary depending on previous input.
1532# - A single input byte can correspond to many characters of output.
1533# - The number of output characters can be undetermined until the
1534# last input byte is received.
1535# - The number of output characters can vary depending on previous input.
1536
1537class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1538 """
1539 For testing seek/tell behavior with a stateful, buffering decoder.
1540
1541 Input is a sequence of words. Words may be fixed-length (length set
1542 by input) or variable-length (period-terminated). In variable-length
1543 mode, extra periods are ignored. Possible words are:
1544 - 'i' followed by a number sets the input length, I (maximum 99).
1545 When I is set to 0, words are space-terminated.
1546 - 'o' followed by a number sets the output length, O (maximum 99).
1547 - Any other word is converted into a word followed by a period on
1548 the output. The output word consists of the input word truncated
1549 or padded out with hyphens to make its length equal to O. If O
1550 is 0, the word is output verbatim without truncating or padding.
1551 I and O are initially set to 1. When I changes, any buffered input is
1552 re-scanned according to the new I. EOF also terminates the last word.
1553 """
1554
1555 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001556 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001557 self.reset()
1558
1559 def __repr__(self):
1560 return '<SID %x>' % id(self)
1561
1562 def reset(self):
1563 self.i = 1
1564 self.o = 1
1565 self.buffer = bytearray()
1566
1567 def getstate(self):
1568 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1569 return bytes(self.buffer), i*100 + o
1570
1571 def setstate(self, state):
1572 buffer, io = state
1573 self.buffer = bytearray(buffer)
1574 i, o = divmod(io, 100)
1575 self.i, self.o = i ^ 1, o ^ 1
1576
1577 def decode(self, input, final=False):
1578 output = ''
1579 for b in input:
1580 if self.i == 0: # variable-length, terminated with period
1581 if b == ord('.'):
1582 if self.buffer:
1583 output += self.process_word()
1584 else:
1585 self.buffer.append(b)
1586 else: # fixed-length, terminate after self.i bytes
1587 self.buffer.append(b)
1588 if len(self.buffer) == self.i:
1589 output += self.process_word()
1590 if final and self.buffer: # EOF terminates the last word
1591 output += self.process_word()
1592 return output
1593
1594 def process_word(self):
1595 output = ''
1596 if self.buffer[0] == ord('i'):
1597 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1598 elif self.buffer[0] == ord('o'):
1599 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1600 else:
1601 output = self.buffer.decode('ascii')
1602 if len(output) < self.o:
1603 output += '-'*self.o # pad out with hyphens
1604 if self.o:
1605 output = output[:self.o] # truncate to output length
1606 output += '.'
1607 self.buffer = bytearray()
1608 return output
1609
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001610 codecEnabled = False
1611
1612 @classmethod
1613 def lookupTestDecoder(cls, name):
1614 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001615 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001616 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001617 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001618 incrementalencoder=None,
1619 streamreader=None, streamwriter=None,
1620 incrementaldecoder=cls)
1621
1622# Register the previous decoder for testing.
1623# Disabled by default, tests will enable it.
1624codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1625
1626
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001627class StatefulIncrementalDecoderTest(unittest.TestCase):
1628 """
1629 Make sure the StatefulIncrementalDecoder actually works.
1630 """
1631
1632 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001633 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001634 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001635 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001636 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001637 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001638 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001639 # I=0, O=6 (variable-length input, fixed-length output)
1640 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1641 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001642 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001643 # I=6, O=3 (fixed-length input > fixed-length output)
1644 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1645 # I=0, then 3; O=29, then 15 (with longer output)
1646 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1647 'a----------------------------.' +
1648 'b----------------------------.' +
1649 'cde--------------------------.' +
1650 'abcdefghijabcde.' +
1651 'a.b------------.' +
1652 '.c.------------.' +
1653 'd.e------------.' +
1654 'k--------------.' +
1655 'l--------------.' +
1656 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001657 ]
1658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001660 # Try a few one-shot test cases.
1661 for input, eof, output in self.test_cases:
1662 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001663 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001664
1665 # Also test an unfinished decode, followed by forcing EOF.
1666 d = StatefulIncrementalDecoder()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001667 self.assertEqual(d.decode(b'oiabcd'), '')
1668 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001669
1670class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001671
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001672 def setUp(self):
1673 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1674 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001675 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001676
Guido van Rossumd0712812007-04-11 16:32:43 +00001677 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001678 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 def test_constructor(self):
1681 r = self.BytesIO(b"\xc3\xa9\n\n")
1682 b = self.BufferedReader(r, 1000)
1683 t = self.TextIOWrapper(b)
1684 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001685 self.assertEqual(t.encoding, "latin1")
1686 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001687 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001688 self.assertEqual(t.encoding, "utf8")
1689 self.assertEqual(t.line_buffering, True)
1690 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 self.assertRaises(TypeError, t.__init__, b, newline=42)
1692 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1693
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001694 def test_detach(self):
1695 r = self.BytesIO()
1696 b = self.BufferedWriter(r)
1697 t = self.TextIOWrapper(b)
1698 self.assertIs(t.detach(), b)
1699
1700 t = self.TextIOWrapper(b, encoding="ascii")
1701 t.write("howdy")
1702 self.assertFalse(r.getvalue())
1703 t.detach()
1704 self.assertEqual(r.getvalue(), b"howdy")
1705 self.assertRaises(ValueError, t.detach)
1706
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001707 def test_repr(self):
1708 raw = self.BytesIO("hello".encode("utf-8"))
1709 b = self.BufferedReader(raw)
1710 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001711 modname = self.TextIOWrapper.__module__
1712 self.assertEqual(repr(t),
1713 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1714 raw.name = "dummy"
1715 self.assertEqual(repr(t),
1716 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1717 raw.name = b"dummy"
1718 self.assertEqual(repr(t),
1719 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 def test_line_buffering(self):
1722 r = self.BytesIO()
1723 b = self.BufferedWriter(r, 1000)
1724 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001725 t.write("X")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001726 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001727 t.write("Y\nZ")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001728 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001729 t.write("A\rB")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001730 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 def test_encoding(self):
1733 # Check the encoding attribute is always set, and valid
1734 b = self.BytesIO()
1735 t = self.TextIOWrapper(b, encoding="utf8")
1736 self.assertEqual(t.encoding, "utf8")
1737 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001738 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 codecs.lookup(t.encoding)
1740
1741 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001742 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 b = self.BytesIO(b"abc\n\xff\n")
1744 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001745 self.assertRaises(UnicodeError, t.read)
1746 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 b = self.BytesIO(b"abc\n\xff\n")
1748 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001749 self.assertRaises(UnicodeError, t.read)
1750 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 b = self.BytesIO(b"abc\n\xff\n")
1752 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001753 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001754 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 b = self.BytesIO(b"abc\n\xff\n")
1756 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001757 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001760 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 b = self.BytesIO()
1762 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001763 self.assertRaises(UnicodeError, t.write, "\xff")
1764 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 b = self.BytesIO()
1766 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001767 self.assertRaises(UnicodeError, t.write, "\xff")
1768 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 b = self.BytesIO()
1770 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001771 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001772 t.write("abc\xffdef\n")
1773 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001774 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001775 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 b = self.BytesIO()
1777 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001778 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001779 t.write("abc\xffdef\n")
1780 t.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001781 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001783 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001784 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1785
1786 tests = [
1787 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001788 [ '', input_lines ],
1789 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1790 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1791 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001792 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001793 encodings = (
1794 'utf-8', 'latin-1',
1795 'utf-16', 'utf-16-le', 'utf-16-be',
1796 'utf-32', 'utf-32-le', 'utf-32-be',
1797 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001798
Guido van Rossum8358db22007-08-18 21:39:55 +00001799 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001800 # character in TextIOWrapper._pending_line.
1801 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001802 # XXX: str.encode() should return bytes
1803 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001804 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001805 for bufsize in range(1, 10):
1806 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1808 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001809 encoding=encoding)
1810 if do_reads:
1811 got_lines = []
1812 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001813 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001814 if c2 == '':
1815 break
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001816 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001817 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001818 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001819 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001820
1821 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001822 self.assertEqual(got_line, exp_line)
1823 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001824
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 def test_newlines_input(self):
1826 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001827 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1828 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001829 (None, normalized.decode("ascii").splitlines(True)),
1830 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1832 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1833 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001834 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 buf = self.BytesIO(testdata)
1836 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001837 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001838 txt.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001839 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841 def test_newlines_output(self):
1842 testdict = {
1843 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1844 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1845 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1846 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1847 }
1848 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1849 for newline, expected in tests:
1850 buf = self.BytesIO()
1851 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1852 txt.write("AAA\nB")
1853 txt.write("BB\nCCC\n")
1854 txt.write("X\rY\r\nZ")
1855 txt.flush()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001856 self.assertEqual(buf.closed, False)
1857 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858
1859 def test_destructor(self):
1860 l = []
1861 base = self.BytesIO
1862 class MyBytesIO(base):
1863 def close(self):
1864 l.append(self.getvalue())
1865 base.close(self)
1866 b = MyBytesIO()
1867 t = self.TextIOWrapper(b, encoding="ascii")
1868 t.write("abc")
1869 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001870 support.gc_collect()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001871 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872
1873 def test_override_destructor(self):
1874 record = []
1875 class MyTextIO(self.TextIOWrapper):
1876 def __del__(self):
1877 record.append(1)
1878 try:
1879 f = super().__del__
1880 except AttributeError:
1881 pass
1882 else:
1883 f()
1884 def close(self):
1885 record.append(2)
1886 super().close()
1887 def flush(self):
1888 record.append(3)
1889 super().flush()
1890 b = self.BytesIO()
1891 t = MyTextIO(b, encoding="ascii")
1892 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001893 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894 self.assertEqual(record, [1, 2, 3])
1895
1896 def test_error_through_destructor(self):
1897 # Test that the exception state is not modified by a destructor,
1898 # even if close() fails.
1899 rawio = self.CloseFailureIO()
1900 def f():
1901 self.TextIOWrapper(rawio).xyzzy
1902 with support.captured_output("stderr") as s:
1903 self.assertRaises(AttributeError, f)
1904 s = s.getvalue().strip()
1905 if s:
1906 # The destructor *may* have printed an unraisable error, check it
1907 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001908 self.assertTrue(s.startswith("Exception IOError: "), s)
1909 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001910
Guido van Rossum9b76da62007-04-11 01:09:03 +00001911 # Systematic tests of the text I/O API
1912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1915 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001917 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001918 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001919 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001921 f._CHUNK_SIZE = chunksize
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001922 self.assertEqual(f.tell(), 0)
1923 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001924 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001925 self.assertEqual(f.seek(0), 0)
1926 self.assertEqual(f.read(None), "abc")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001927 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001928 self.assertEqual(f.read(2), "ab")
1929 self.assertEqual(f.read(1), "c")
1930 self.assertEqual(f.read(1), "")
1931 self.assertEqual(f.read(), "")
1932 self.assertEqual(f.tell(), cookie)
1933 self.assertEqual(f.seek(0), 0)
1934 self.assertEqual(f.seek(0, 2), cookie)
1935 self.assertEqual(f.write("def"), 3)
1936 self.assertEqual(f.seek(cookie), cookie)
1937 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001938 if enc.startswith("utf"):
1939 self.multi_line_test(f, enc)
1940 f.close()
1941
1942 def multi_line_test(self, f, enc):
1943 f.seek(0)
1944 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001945 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001947 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001948 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001949 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001950 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001951 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 wlines.append((f.tell(), line))
1953 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954 f.seek(0)
1955 rlines = []
1956 while True:
1957 pos = f.tell()
1958 line = f.readline()
1959 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001960 break
1961 rlines.append((pos, line))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001962 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 def test_telling(self):
1965 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001966 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001967 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001968 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001969 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001970 p2 = f.tell()
1971 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001972 self.assertEqual(f.tell(), p0)
1973 self.assertEqual(f.readline(), "\xff\n")
1974 self.assertEqual(f.tell(), p1)
1975 self.assertEqual(f.readline(), "\xff\n")
1976 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001977 f.seek(0)
1978 for line in f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001979 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001980 self.assertRaises(IOError, f.tell)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001981 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001982 f.close()
1983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 def test_seeking(self):
1985 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001986 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001987 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001988 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001989 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001990 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001991 suffix = bytes(u_suffix.encode("utf-8"))
1992 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001994 f.write(line*2)
1995 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001997 s = f.read(prefix_size)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00001998 self.assertEqual(s, str(prefix, "ascii"))
1999 self.assertEqual(f.tell(), prefix_size)
2000 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002003 # Regression test for a specific bug
2004 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002006 f.write(data)
2007 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002009 f._CHUNK_SIZE # Just test that it exists
2010 f._CHUNK_SIZE = 2
2011 f.readline()
2012 f.tell()
2013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 def test_seek_and_tell(self):
2015 #Test seek/tell using the StatefulIncrementalDecoder.
2016 # Make test faster by doing smaller seeks
2017 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002018
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002019 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002020 """Tell/seek to various points within a data stream and ensure
2021 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002023 f.write(data)
2024 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 f = self.open(support.TESTFN, encoding='test_decoder')
2026 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002027 decoded = f.read()
2028 f.close()
2029
Neal Norwitze2b07052008-03-18 19:52:05 +00002030 for i in range(min_pos, len(decoded) + 1): # seek positions
2031 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002033 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002034 cookie = f.tell()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002035 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002036 f.seek(cookie)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002037 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038 f.close()
2039
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002040 # Enable the test decoder.
2041 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002042
2043 # Run the tests.
2044 try:
2045 # Try each test case.
2046 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002047 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002048
2049 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002050 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2051 offset = CHUNK_SIZE - len(input)//2
2052 prefix = b'.'*offset
2053 # Don't bother seeking into the prefix (takes too long).
2054 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002055 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002056
2057 # Ensure our test decoder won't interfere with subsequent tests.
2058 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002059 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002060
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002061 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002062 data = "1234567890"
2063 tests = ("utf-16",
2064 "utf-16-le",
2065 "utf-16-be",
2066 "utf-32",
2067 "utf-32-le",
2068 "utf-32-be")
2069 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070 buf = self.BytesIO()
2071 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002072 # Check if the BOM is written only once (see issue1753).
2073 f.write(data)
2074 f.write(data)
2075 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002076 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002077 f.seek(0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002078 self.assertEqual(f.read(), data * 2)
2079 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002080
Benjamin Petersona1b49012009-03-31 23:11:32 +00002081 def test_unreadable(self):
2082 class UnReadable(self.BytesIO):
2083 def readable(self):
2084 return False
2085 txt = self.TextIOWrapper(UnReadable())
2086 self.assertRaises(IOError, txt.read)
2087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002088 def test_read_one_by_one(self):
2089 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002090 reads = ""
2091 while True:
2092 c = txt.read(1)
2093 if not c:
2094 break
2095 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002096 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002097
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002098 def test_readlines(self):
2099 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2100 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2101 txt.seek(0)
2102 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2103 txt.seek(0)
2104 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2105
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002106 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002107 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002108 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002110 reads = ""
2111 while True:
2112 c = txt.read(128)
2113 if not c:
2114 break
2115 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002116 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002117
2118 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002119 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002120
2121 # read one char at a time
2122 reads = ""
2123 while True:
2124 c = txt.read(1)
2125 if not c:
2126 break
2127 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002128 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002129
2130 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002132 txt._CHUNK_SIZE = 4
2133
2134 reads = ""
2135 while True:
2136 c = txt.read(4)
2137 if not c:
2138 break
2139 reads += c
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002140 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002141
2142 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002144 txt._CHUNK_SIZE = 4
2145
2146 reads = txt.read(4)
2147 reads += txt.read(4)
2148 reads += txt.readline()
2149 reads += txt.readline()
2150 reads += txt.readline()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002151 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002152
2153 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002155 txt._CHUNK_SIZE = 4
2156
2157 reads = txt.read(4)
2158 reads += txt.read()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002159 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002160
2161 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002163 txt._CHUNK_SIZE = 4
2164
2165 reads = txt.read(4)
2166 pos = txt.tell()
2167 txt.seek(0)
2168 txt.seek(pos)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002169 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002170
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002171 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 buffer = self.BytesIO(self.testdata)
2173 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002174
2175 self.assertEqual(buffer.seekable(), txt.seekable())
2176
Antoine Pitroue4501852009-05-14 18:55:55 +00002177 def test_append_bom(self):
2178 # The BOM is not written again when appending to a non-empty file
2179 filename = support.TESTFN
2180 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2181 with self.open(filename, 'w', encoding=charset) as f:
2182 f.write('aaa')
2183 pos = f.tell()
2184 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002185 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002186
2187 with self.open(filename, 'a', encoding=charset) as f:
2188 f.write('xxx')
2189 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002190 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002191
2192 def test_seek_bom(self):
2193 # Same test, but when seeking manually
2194 filename = support.TESTFN
2195 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2196 with self.open(filename, 'w', encoding=charset) as f:
2197 f.write('aaa')
2198 pos = f.tell()
2199 with self.open(filename, 'r+', encoding=charset) as f:
2200 f.seek(pos)
2201 f.write('zzz')
2202 f.seek(0)
2203 f.write('bbb')
2204 with self.open(filename, 'rb') as f:
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002205 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002206
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002207 def test_errors_property(self):
2208 with self.open(support.TESTFN, "w") as f:
2209 self.assertEqual(f.errors, "strict")
2210 with self.open(support.TESTFN, "w", errors="replace") as f:
2211 self.assertEqual(f.errors, "replace")
2212
Antoine Pitroue4501852009-05-14 18:55:55 +00002213
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002214 def test_threads_write(self):
2215 # Issue6750: concurrent writes could duplicate data
2216 event = threading.Event()
2217 with self.open(support.TESTFN, "w", buffering=1) as f:
2218 def run(n):
2219 text = "Thread%03d\n" % n
2220 event.wait()
2221 f.write(text)
2222 threads = [threading.Thread(target=lambda n=x: run(n))
2223 for x in range(20)]
2224 for t in threads:
2225 t.start()
2226 time.sleep(0.02)
2227 event.set()
2228 for t in threads:
2229 t.join()
2230 with self.open(support.TESTFN) as f:
2231 content = f.read()
2232 for n in range(20):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002233 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002234
Antoine Pitroufaf90072010-05-03 16:58:19 +00002235 def test_flush_error_on_close(self):
2236 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2237 def bad_flush():
2238 raise IOError()
2239 txt.flush = bad_flush
2240 self.assertRaises(IOError, txt.close) # exception not swallowed
2241
2242 def test_multi_close(self):
2243 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2244 txt.close()
2245 txt.close()
2246 txt.close()
2247 self.assertRaises(ValueError, txt.flush)
2248
Antoine Pitrou6cfc5122010-12-21 21:26:09 +00002249 def test_readonly_attributes(self):
2250 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2251 buf = self.BytesIO(self.testdata)
2252 with self.assertRaises(AttributeError):
2253 txt.buffer = buf
2254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255class CTextIOWrapperTest(TextIOWrapperTest):
2256
2257 def test_initialization(self):
2258 r = self.BytesIO(b"\xc3\xa9\n\n")
2259 b = self.BufferedReader(r, 1000)
2260 t = self.TextIOWrapper(b)
2261 self.assertRaises(TypeError, t.__init__, b, newline=42)
2262 self.assertRaises(ValueError, t.read)
2263 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2264 self.assertRaises(ValueError, t.read)
2265
2266 def test_garbage_collection(self):
2267 # C TextIOWrapper objects are collected, and collecting them flushes
2268 # all data to disk.
2269 # The Python version has __del__, so it ends in gc.garbage instead.
2270 rawio = io.FileIO(support.TESTFN, "wb")
2271 b = self.BufferedWriter(rawio)
2272 t = self.TextIOWrapper(b, encoding="ascii")
2273 t.write("456def")
2274 t.x = t
2275 wr = weakref.ref(t)
2276 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002277 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002278 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002279 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 self.assertEqual(f.read(), b"456def")
2281
2282class PyTextIOWrapperTest(TextIOWrapperTest):
2283 pass
2284
2285
2286class IncrementalNewlineDecoderTest(unittest.TestCase):
2287
2288 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002289 # UTF-8 specific tests for a newline decoder
2290 def _check_decode(b, s, **kwargs):
2291 # We exercise getstate() / setstate() as well as decode()
2292 state = decoder.getstate()
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002293 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002294 decoder.setstate(state)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002295 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002296
Antoine Pitrou180a3362008-12-14 16:36:46 +00002297 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002298
Antoine Pitrou180a3362008-12-14 16:36:46 +00002299 _check_decode(b'\xe8', "")
2300 _check_decode(b'\xa2', "")
2301 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002302
Antoine Pitrou180a3362008-12-14 16:36:46 +00002303 _check_decode(b'\xe8', "")
2304 _check_decode(b'\xa2', "")
2305 _check_decode(b'\x88', "\u8888")
2306
2307 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002308 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2309
Antoine Pitrou180a3362008-12-14 16:36:46 +00002310 decoder.reset()
2311 _check_decode(b'\n', "\n")
2312 _check_decode(b'\r', "")
2313 _check_decode(b'', "\n", final=True)
2314 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002315
Antoine Pitrou180a3362008-12-14 16:36:46 +00002316 _check_decode(b'\r', "")
2317 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002318
Antoine Pitrou180a3362008-12-14 16:36:46 +00002319 _check_decode(b'\r\r\n', "\n\n")
2320 _check_decode(b'\r', "")
2321 _check_decode(b'\r', "\n")
2322 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002323
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2325 _check_decode(b'\xe8\xa2\x88', "\u8888")
2326 _check_decode(b'\n', "\n")
2327 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2328 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002331 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 if encoding is not None:
2333 encoder = codecs.getincrementalencoder(encoding)()
2334 def _decode_bytewise(s):
2335 # Decode one byte at a time
2336 for b in encoder.encode(s):
2337 result.append(decoder.decode(bytes([b])))
2338 else:
2339 encoder = None
2340 def _decode_bytewise(s):
2341 # Decode one char at a time
2342 for c in s:
2343 result.append(decoder.decode(c))
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002344 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002345 _decode_bytewise("abc\n\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002346 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002347 _decode_bytewise("\nabc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002348 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002349 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002350 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002351 _decode_bytewise("abc")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002352 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002353 _decode_bytewise("abc\r")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002354 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002355 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 input = "abc"
2357 if encoder is not None:
2358 encoder.reset()
2359 input = encoder.encode(input)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002360 self.assertEqual(decoder.decode(input), "abc")
2361 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002362
2363 def test_newline_decoder(self):
2364 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 # None meaning the IncrementalNewlineDecoder takes unicode input
2366 # rather than bytes input
2367 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002368 'utf-16', 'utf-16-le', 'utf-16-be',
2369 'utf-32', 'utf-32-le', 'utf-32-be',
2370 )
2371 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 decoder = enc and codecs.getincrementaldecoder(enc)()
2373 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2374 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002375 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2377 self.check_newline_decoding_utf8(decoder)
2378
Antoine Pitrou66913e22009-03-06 23:40:56 +00002379 def test_newline_bytes(self):
2380 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2381 def _check(dec):
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002382 self.assertEqual(dec.newlines, None)
2383 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2384 self.assertEqual(dec.newlines, None)
2385 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2386 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002387 dec = self.IncrementalNewlineDecoder(None, translate=False)
2388 _check(dec)
2389 dec = self.IncrementalNewlineDecoder(None, translate=True)
2390 _check(dec)
2391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2393 pass
2394
2395class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2396 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002397
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002398
Guido van Rossum01a27522007-03-07 01:00:12 +00002399# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002400
Guido van Rossum5abbf752007-08-27 17:39:33 +00002401class MiscIOTest(unittest.TestCase):
2402
Barry Warsaw40e82462008-11-20 20:14:50 +00002403 def tearDown(self):
2404 support.unlink(support.TESTFN)
2405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 def test___all__(self):
2407 for name in self.io.__all__:
2408 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002409 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002410 if name == "open":
2411 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002412 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002413 self.assertTrue(issubclass(obj, Exception), name)
2414 elif not name.startswith("SEEK_"):
2415 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002416
Barry Warsaw40e82462008-11-20 20:14:50 +00002417 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002418 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002419 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002420 f.close()
2421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 f = self.open(support.TESTFN, "U")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002423 self.assertEqual(f.name, support.TESTFN)
2424 self.assertEqual(f.buffer.name, support.TESTFN)
2425 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2426 self.assertEqual(f.mode, "U")
2427 self.assertEqual(f.buffer.mode, "rb")
2428 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002429 f.close()
2430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002431 f = self.open(support.TESTFN, "w+")
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002432 self.assertEqual(f.mode, "w+")
2433 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2434 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002436 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +00002437 self.assertEqual(g.mode, "wb")
2438 self.assertEqual(g.raw.mode, "wb")
2439 self.assertEqual(g.name, f.fileno())
2440 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002441 f.close()
2442 g.close()
2443
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002444 def test_io_after_close(self):
2445 for kwargs in [
2446 {"mode": "w"},
2447 {"mode": "wb"},
2448 {"mode": "w", "buffering": 1},
2449 {"mode": "w", "buffering": 2},
2450 {"mode": "wb", "buffering": 0},
2451 {"mode": "r"},
2452 {"mode": "rb"},
2453 {"mode": "r", "buffering": 1},
2454 {"mode": "r", "buffering": 2},
2455 {"mode": "rb", "buffering": 0},
2456 {"mode": "w+"},
2457 {"mode": "w+b"},
2458 {"mode": "w+", "buffering": 1},
2459 {"mode": "w+", "buffering": 2},
2460 {"mode": "w+b", "buffering": 0},
2461 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002462 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002463 f.close()
2464 self.assertRaises(ValueError, f.flush)
2465 self.assertRaises(ValueError, f.fileno)
2466 self.assertRaises(ValueError, f.isatty)
2467 self.assertRaises(ValueError, f.__iter__)
2468 if hasattr(f, "peek"):
2469 self.assertRaises(ValueError, f.peek, 1)
2470 self.assertRaises(ValueError, f.read)
2471 if hasattr(f, "read1"):
2472 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002473 if hasattr(f, "readall"):
2474 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002475 if hasattr(f, "readinto"):
2476 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2477 self.assertRaises(ValueError, f.readline)
2478 self.assertRaises(ValueError, f.readlines)
2479 self.assertRaises(ValueError, f.seek, 0)
2480 self.assertRaises(ValueError, f.tell)
2481 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 self.assertRaises(ValueError, f.write,
2483 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002484 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002485 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002487 def test_blockingioerror(self):
2488 # Various BlockingIOError issues
2489 self.assertRaises(TypeError, self.BlockingIOError)
2490 self.assertRaises(TypeError, self.BlockingIOError, 1)
2491 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2492 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2493 b = self.BlockingIOError(1, "")
2494 self.assertEqual(b.characters_written, 0)
2495 class C(str):
2496 pass
2497 c = C("")
2498 b = self.BlockingIOError(1, c)
2499 c.b = b
2500 b.c = c
2501 wr = weakref.ref(c)
2502 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002503 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002504 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002505
2506 def test_abcs(self):
2507 # Test the visible base classes are ABCs.
2508 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2509 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2510 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2511 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2512
2513 def _check_abc_inheritance(self, abcmodule):
2514 with self.open(support.TESTFN, "wb", buffering=0) as f:
2515 self.assertTrue(isinstance(f, abcmodule.IOBase))
2516 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2517 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2518 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2519 with self.open(support.TESTFN, "wb") as f:
2520 self.assertTrue(isinstance(f, abcmodule.IOBase))
2521 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2522 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2523 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2524 with self.open(support.TESTFN, "w") as f:
2525 self.assertTrue(isinstance(f, abcmodule.IOBase))
2526 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2527 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2528 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2529
2530 def test_abc_inheritance(self):
2531 # Test implementations inherit from their respective ABCs
2532 self._check_abc_inheritance(self)
2533
2534 def test_abc_inheritance_official(self):
2535 # Test implementations inherit from the official ABCs of the
2536 # baseline "io" module.
2537 self._check_abc_inheritance(io)
2538
2539class CMiscIOTest(MiscIOTest):
2540 io = io
2541
2542class PyMiscIOTest(MiscIOTest):
2543 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002544
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002545
2546@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2547class SignalsTest(unittest.TestCase):
2548
2549 def setUp(self):
2550 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2551
2552 def tearDown(self):
2553 signal.signal(signal.SIGALRM, self.oldalrm)
2554
2555 def alarm_interrupt(self, sig, frame):
2556 1/0
2557
2558 @unittest.skipUnless(threading, 'Threading required for this test.')
2559 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2560 """Check that a partial write, when it gets interrupted, properly
2561 invokes the signal handler."""
2562 read_results = []
2563 def _read():
2564 s = os.read(r, 1)
2565 read_results.append(s)
2566 t = threading.Thread(target=_read)
2567 t.daemon = True
2568 r, w = os.pipe()
2569 try:
2570 wio = self.io.open(w, **fdopen_kwargs)
2571 t.start()
2572 signal.alarm(1)
2573 # Fill the pipe enough that the write will be blocking.
2574 # It will be interrupted by the timer armed above. Since the
2575 # other thread has read one byte, the low-level write will
2576 # return with a successful (partial) result rather than an EINTR.
2577 # The buffered IO layer must check for pending signal
2578 # handlers, which in this case will invoke alarm_interrupt().
2579 self.assertRaises(ZeroDivisionError,
2580 wio.write, item * (1024 * 1024))
2581 t.join()
2582 # We got one byte, get another one and check that it isn't a
2583 # repeat of the first one.
2584 read_results.append(os.read(r, 1))
2585 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2586 finally:
2587 os.close(w)
2588 os.close(r)
2589 # This is deliberate. If we didn't close the file descriptor
2590 # before closing wio, wio would try to flush its internal
2591 # buffer, and block again.
2592 try:
2593 wio.close()
2594 except IOError as e:
2595 if e.errno != errno.EBADF:
2596 raise
2597
2598 def test_interrupted_write_unbuffered(self):
2599 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2600
2601 def test_interrupted_write_buffered(self):
2602 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2603
2604 def test_interrupted_write_text(self):
2605 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2606
Antoine Pitrou976157f2010-12-03 19:21:49 +00002607 def check_reentrant_write(self, data, **fdopen_kwargs):
2608 def on_alarm(*args):
2609 # Will be called reentrantly from the same thread
2610 wio.write(data)
2611 1/0
2612 signal.signal(signal.SIGALRM, on_alarm)
2613 r, w = os.pipe()
2614 wio = self.io.open(w, **fdopen_kwargs)
2615 try:
2616 signal.alarm(1)
2617 # Either the reentrant call to wio.write() fails with RuntimeError,
2618 # or the signal handler raises ZeroDivisionError.
2619 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2620 while 1:
2621 for i in range(100):
2622 wio.write(data)
2623 wio.flush()
2624 # Make sure the buffer doesn't fill up and block further writes
2625 os.read(r, len(data) * 100)
2626 finally:
2627 wio.close()
2628 os.close(r)
2629
2630 def test_reentrant_write_buffered(self):
2631 self.check_reentrant_write(b"xy", mode="wb")
2632
2633 def test_reentrant_write_text(self):
2634 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2635
2636
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002637class CSignalsTest(SignalsTest):
2638 io = io
2639
2640class PySignalsTest(SignalsTest):
2641 io = pyio
2642
Antoine Pitrou976157f2010-12-03 19:21:49 +00002643 # Handling reentrancy issues would slow down _pyio even more, so the
2644 # tests are disabled.
2645 test_reentrant_write_buffered = None
2646 test_reentrant_write_text = None
2647
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002648
Guido van Rossum28524c72007-02-27 05:47:44 +00002649def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 tests = (CIOTest, PyIOTest,
2651 CBufferedReaderTest, PyBufferedReaderTest,
2652 CBufferedWriterTest, PyBufferedWriterTest,
2653 CBufferedRWPairTest, PyBufferedRWPairTest,
2654 CBufferedRandomTest, PyBufferedRandomTest,
2655 StatefulIncrementalDecoderTest,
2656 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2657 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002658 CMiscIOTest, PyMiscIOTest,
2659 CSignalsTest, PySignalsTest,
2660 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661
2662 # Put the namespaces of the IO module we are testing and some useful mock
2663 # classes in the __dict__ of each test.
2664 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitroue5e75c62010-09-14 18:53:07 +00002665 MockNonBlockWriterIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2667 c_io_ns = {name : getattr(io, name) for name in all_members}
2668 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2669 globs = globals()
2670 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2671 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2672 # Avoid turning open into a bound method.
2673 py_io_ns["open"] = pyio.OpenWrapper
2674 for test in tests:
2675 if test.__name__.startswith("C"):
2676 for name, obj in c_io_ns.items():
2677 setattr(test, name, obj)
2678 elif test.__name__.startswith("Py"):
2679 for name, obj in py_io_ns.items():
2680 setattr(test, name, obj)
2681
2682 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002683
2684if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 test_main()