blob: 23edee69ed890ba6a93d933151d8d2e526bb9073 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200658
659 def test_IOBase_finalize(self):
660 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
661 # class which inherits IOBase and an object of this class are caught
662 # in a reference cycle and close() is already in the method cache.
663 class MyIO(self.IOBase):
664 def close(self):
665 pass
666
667 # create an instance to populate the method cache
668 MyIO()
669 obj = MyIO()
670 obj.obj = obj
671 wr = weakref.ref(obj)
672 del MyIO
673 del obj
674 support.gc_collect()
675 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677class PyIOTest(IOTest):
678 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000679
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class CommonBufferedTests:
682 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
683
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000684 def test_detach(self):
685 raw = self.MockRawIO()
686 buf = self.tp(raw)
687 self.assertIs(buf.detach(), raw)
688 self.assertRaises(ValueError, buf.detach)
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melottib3aedd42010-11-20 19:04:17 +0000694 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695
Zachary Ware9fe6d862013-12-08 00:20:35 -0600696 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200707 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super().__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super().close()
724 def flush(self):
725 record.append(3)
726 super().flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000731 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000762 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000763
Antoine Pitrou716c4442009-05-23 19:04:03 +0000764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
773
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774 def test_flush_error_on_close(self):
775 raw = self.MockRawIO()
776 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200777 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 raw.flush = bad_flush
779 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200780 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600781 self.assertTrue(b.closed)
782
783 def test_close_error_on_close(self):
784 raw = self.MockRawIO()
785 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200786 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600787 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600789 raw.close = bad_close
790 b = self.tp(raw)
791 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 b.close()
794 self.assertEqual(err.exception.args, ('close',))
795 self.assertEqual(err.exception.__context__.args, ('flush',))
796 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797
798 def test_multi_close(self):
799 raw = self.MockRawIO()
800 b = self.tp(raw)
801 b.close()
802 b.close()
803 b.close()
804 self.assertRaises(ValueError, b.flush)
805
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000806 def test_unseekable(self):
807 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
808 self.assertRaises(self.UnsupportedOperation, bufio.tell)
809 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
810
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000811 def test_readonly_attributes(self):
812 raw = self.MockRawIO()
813 buf = self.tp(raw)
814 x = self.MockRawIO()
815 with self.assertRaises(AttributeError):
816 buf.raw = x
817
Guido van Rossum78892e42007-04-06 17:31:18 +0000818
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200819class SizeofTest:
820
821 @support.cpython_only
822 def test_sizeof(self):
823 bufsize1 = 4096
824 bufsize2 = 8192
825 rawio = self.MockRawIO()
826 bufio = self.tp(rawio, buffer_size=bufsize1)
827 size = sys.getsizeof(bufio) - bufsize1
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize2)
830 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
831
Jesus Ceadc469452012-10-04 12:37:56 +0200832 @support.cpython_only
833 def test_buffer_freeing(self) :
834 bufsize = 4096
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio, buffer_size=bufsize)
837 size = sys.getsizeof(bufio) - bufsize
838 bufio.close()
839 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
842 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_constructor(self):
845 rawio = self.MockRawIO([b"abc"])
846 bufio = self.tp(rawio)
847 bufio.__init__(rawio)
848 bufio.__init__(rawio, buffer_size=1024)
849 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000850 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
854 rawio = self.MockRawIO([b"abc"])
855 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000859 for arg in (None, 7):
860 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
861 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000862 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000863 # Invalid args
864 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 def test_read1(self):
867 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
868 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(b"a", bufio.read(1))
870 self.assertEqual(b"b", bufio.read1(1))
871 self.assertEqual(rawio._reads, 1)
872 self.assertEqual(b"c", bufio.read1(100))
873 self.assertEqual(rawio._reads, 1)
874 self.assertEqual(b"d", bufio.read1(100))
875 self.assertEqual(rawio._reads, 2)
876 self.assertEqual(b"efg", bufio.read1(100))
877 self.assertEqual(rawio._reads, 3)
878 self.assertEqual(b"", bufio.read1(100))
879 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000880 # Invalid args
881 self.assertRaises(ValueError, bufio.read1, -1)
882
883 def test_readinto(self):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
886 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000887 self.assertEqual(bufio.readinto(b), 2)
888 self.assertEqual(b, b"ab")
889 self.assertEqual(bufio.readinto(b), 2)
890 self.assertEqual(b, b"cd")
891 self.assertEqual(bufio.readinto(b), 2)
892 self.assertEqual(b, b"ef")
893 self.assertEqual(bufio.readinto(b), 1)
894 self.assertEqual(b, b"gf")
895 self.assertEqual(bufio.readinto(b), 0)
896 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200897 rawio = self.MockRawIO((b"abc", None))
898 bufio = self.tp(rawio)
899 self.assertEqual(bufio.readinto(b), 2)
900 self.assertEqual(b, b"ab")
901 self.assertEqual(bufio.readinto(b), 1)
902 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000904 def test_readlines(self):
905 def bufio():
906 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
907 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000908 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
909 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
910 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000913 data = b"abcdefghi"
914 dlen = len(data)
915
916 tests = [
917 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
918 [ 100, [ 3, 3, 3], [ dlen ] ],
919 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
920 ]
921
922 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 rawio = self.MockFileIO(data)
924 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000925 pos = 0
926 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000927 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000928 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000930 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000933 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000934 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
935 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000936 self.assertEqual(b"abcd", bufio.read(6))
937 self.assertEqual(b"e", bufio.read(1))
938 self.assertEqual(b"fg", bufio.read())
939 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200940 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000941 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000942
Victor Stinnera80987f2011-05-25 22:47:16 +0200943 rawio = self.MockRawIO((b"a", None, None))
944 self.assertEqual(b"a", rawio.readall())
945 self.assertIsNone(rawio.readall())
946
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 def test_read_past_eof(self):
948 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
949 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000950
Ezio Melottib3aedd42010-11-20 19:04:17 +0000951 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000953 def test_read_all(self):
954 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
955 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956
Ezio Melottib3aedd42010-11-20 19:04:17 +0000957 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000958
Victor Stinner45df8202010-04-28 22:31:17 +0000959 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000960 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000962 try:
963 # Write out many bytes with exactly the same number of 0's,
964 # 1's... 255's. This will help us check that concurrent reading
965 # doesn't duplicate or forget contents.
966 N = 1000
967 l = list(range(256)) * N
968 random.shuffle(l)
969 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000970 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000971 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000972 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000974 errors = []
975 results = []
976 def f():
977 try:
978 # Intra-buffer read then buffer-flushing read
979 for n in cycle([1, 19]):
980 s = bufio.read(n)
981 if not s:
982 break
983 # list.append() is atomic
984 results.append(s)
985 except Exception as e:
986 errors.append(e)
987 raise
988 threads = [threading.Thread(target=f) for x in range(20)]
989 for t in threads:
990 t.start()
991 time.sleep(0.02) # yield
992 for t in threads:
993 t.join()
994 self.assertFalse(errors,
995 "the following exceptions were caught: %r" % errors)
996 s = b''.join(results)
997 for i in range(256):
998 c = bytes(bytearray([i]))
999 self.assertEqual(s.count(c), N)
1000 finally:
1001 support.unlink(support.TESTFN)
1002
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001003 def test_unseekable(self):
1004 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1005 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1006 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1007 bufio.read(1)
1008 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1009 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 def test_misbehaved_io(self):
1012 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1013 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001014 self.assertRaises(OSError, bufio.seek, 0)
1015 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001017 def test_no_extraneous_read(self):
1018 # Issue #9550; when the raw IO object has satisfied the read request,
1019 # we should not issue any additional reads, otherwise it may block
1020 # (e.g. socket).
1021 bufsize = 16
1022 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1023 rawio = self.MockRawIO([b"x" * n])
1024 bufio = self.tp(rawio, bufsize)
1025 self.assertEqual(bufio.read(n), b"x" * n)
1026 # Simple case: one raw read is enough to satisfy the request.
1027 self.assertEqual(rawio._extraneous_reads, 0,
1028 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1029 # A more complex case where two raw reads are needed to satisfy
1030 # the request.
1031 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1032 bufio = self.tp(rawio, bufsize)
1033 self.assertEqual(bufio.read(n), b"x" * n)
1034 self.assertEqual(rawio._extraneous_reads, 0,
1035 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1036
1037
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001038class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 tp = io.BufferedReader
1040
1041 def test_constructor(self):
1042 BufferedReaderTest.test_constructor(self)
1043 # The allocation can succeed on 32-bit builds, e.g. with more
1044 # than 2GB RAM and a 64-bit kernel.
1045 if sys.maxsize > 0x7FFFFFFF:
1046 rawio = self.MockRawIO()
1047 bufio = self.tp(rawio)
1048 self.assertRaises((OverflowError, MemoryError, ValueError),
1049 bufio.__init__, rawio, sys.maxsize)
1050
1051 def test_initialization(self):
1052 rawio = self.MockRawIO([b"abc"])
1053 bufio = self.tp(rawio)
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1055 self.assertRaises(ValueError, bufio.read)
1056 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1057 self.assertRaises(ValueError, bufio.read)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1059 self.assertRaises(ValueError, bufio.read)
1060
1061 def test_misbehaved_io_read(self):
1062 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1063 bufio = self.tp(rawio)
1064 # _pyio.BufferedReader seems to implement reading different, so that
1065 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001066 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067
1068 def test_garbage_collection(self):
1069 # C BufferedReader objects are collected.
1070 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001071 with support.check_warnings(('', ResourceWarning)):
1072 rawio = self.FileIO(support.TESTFN, "w+b")
1073 f = self.tp(rawio)
1074 f.f = f
1075 wr = weakref.ref(f)
1076 del f
1077 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001078 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079
R David Murray67bfe802013-02-23 21:51:05 -05001080 def test_args_error(self):
1081 # Issue #17275
1082 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1083 self.tp(io.BytesIO(), 1024, 1024, 1024)
1084
1085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086class PyBufferedReaderTest(BufferedReaderTest):
1087 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001088
Guido van Rossuma9e20242007-03-08 00:43:48 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1091 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 def test_constructor(self):
1094 rawio = self.MockRawIO()
1095 bufio = self.tp(rawio)
1096 bufio.__init__(rawio)
1097 bufio.__init__(rawio, buffer_size=1024)
1098 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001099 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100 bufio.flush()
1101 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1102 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1103 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1104 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001109 def test_detach_flush(self):
1110 raw = self.MockRawIO()
1111 buf = self.tp(raw)
1112 buf.write(b"howdy!")
1113 self.assertFalse(raw._write_stack)
1114 buf.detach()
1115 self.assertEqual(raw._write_stack, [b"howdy!"])
1116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001118 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119 writer = self.MockRawIO()
1120 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001121 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001122 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def test_write_overflow(self):
1125 writer = self.MockRawIO()
1126 bufio = self.tp(writer, 8)
1127 contents = b"abcdefghijklmnop"
1128 for n in range(0, len(contents), 3):
1129 bufio.write(contents[n:n+3])
1130 flushed = b"".join(writer._write_stack)
1131 # At least (total - 8) bytes were implicitly flushed, perhaps more
1132 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001133 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def check_writes(self, intermediate_func):
1136 # Lots of writes, test the flushed output is as expected.
1137 contents = bytes(range(256)) * 1000
1138 n = 0
1139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 13)
1141 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1142 def gen_sizes():
1143 for size in count(1):
1144 for i in range(15):
1145 yield size
1146 sizes = gen_sizes()
1147 while n < len(contents):
1148 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 intermediate_func(bufio)
1151 n += size
1152 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 def test_writes(self):
1156 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001157
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158 def test_writes_and_flushes(self):
1159 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_writes_and_seeks(self):
1162 def _seekabs(bufio):
1163 pos = bufio.tell()
1164 bufio.seek(pos + 1, 0)
1165 bufio.seek(pos - 1, 0)
1166 bufio.seek(pos, 0)
1167 self.check_writes(_seekabs)
1168 def _seekrel(bufio):
1169 pos = bufio.seek(0, 1)
1170 bufio.seek(+1, 1)
1171 bufio.seek(-1, 1)
1172 bufio.seek(pos, 0)
1173 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_writes_and_truncates(self):
1176 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 def test_write_non_blocking(self):
1179 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001180 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001181
Ezio Melottib3aedd42010-11-20 19:04:17 +00001182 self.assertEqual(bufio.write(b"abcd"), 4)
1183 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 # 1 byte will be written, the rest will be buffered
1185 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1189 raw.block_on(b"0")
1190 try:
1191 bufio.write(b"opqrwxyz0123456789")
1192 except self.BlockingIOError as e:
1193 written = e.characters_written
1194 else:
1195 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001196 self.assertEqual(written, 16)
1197 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001199
Ezio Melottib3aedd42010-11-20 19:04:17 +00001200 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 s = raw.pop_written()
1202 # Previously buffered bytes were flushed
1203 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001205 def test_write_and_rewind(self):
1206 raw = io.BytesIO()
1207 bufio = self.tp(raw, 4)
1208 self.assertEqual(bufio.write(b"abcdef"), 6)
1209 self.assertEqual(bufio.tell(), 6)
1210 bufio.seek(0, 0)
1211 self.assertEqual(bufio.write(b"XY"), 2)
1212 bufio.seek(6, 0)
1213 self.assertEqual(raw.getvalue(), b"XYcdef")
1214 self.assertEqual(bufio.write(b"123456"), 6)
1215 bufio.flush()
1216 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218 def test_flush(self):
1219 writer = self.MockRawIO()
1220 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001221 bufio.write(b"abc")
1222 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001223 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001224
Antoine Pitrou131a4892012-10-16 22:57:11 +02001225 def test_writelines(self):
1226 l = [b'ab', b'cd', b'ef']
1227 writer = self.MockRawIO()
1228 bufio = self.tp(writer, 8)
1229 bufio.writelines(l)
1230 bufio.flush()
1231 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1232
1233 def test_writelines_userlist(self):
1234 l = UserList([b'ab', b'cd', b'ef'])
1235 writer = self.MockRawIO()
1236 bufio = self.tp(writer, 8)
1237 bufio.writelines(l)
1238 bufio.flush()
1239 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1240
1241 def test_writelines_error(self):
1242 writer = self.MockRawIO()
1243 bufio = self.tp(writer, 8)
1244 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1245 self.assertRaises(TypeError, bufio.writelines, None)
1246 self.assertRaises(TypeError, bufio.writelines, 'abc')
1247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 def test_destructor(self):
1249 writer = self.MockRawIO()
1250 bufio = self.tp(writer, 8)
1251 bufio.write(b"abc")
1252 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001253 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001254 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255
1256 def test_truncate(self):
1257 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001258 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 bufio = self.tp(raw, 8)
1260 bufio.write(b"abcdef")
1261 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001262 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001263 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 self.assertEqual(f.read(), b"abc")
1265
Victor Stinner45df8202010-04-28 22:31:17 +00001266 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001267 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001269 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 # Write out many bytes from many threads and test they were
1271 # all flushed.
1272 N = 1000
1273 contents = bytes(range(256)) * N
1274 sizes = cycle([1, 19])
1275 n = 0
1276 queue = deque()
1277 while n < len(contents):
1278 size = next(sizes)
1279 queue.append(contents[n:n+size])
1280 n += size
1281 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001282 # We use a real file object because it allows us to
1283 # exercise situations where the GIL is released before
1284 # writing the buffer to the raw streams. This is in addition
1285 # to concurrency issues due to switching threads in the middle
1286 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001287 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001289 errors = []
1290 def f():
1291 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 while True:
1293 try:
1294 s = queue.popleft()
1295 except IndexError:
1296 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001297 bufio.write(s)
1298 except Exception as e:
1299 errors.append(e)
1300 raise
1301 threads = [threading.Thread(target=f) for x in range(20)]
1302 for t in threads:
1303 t.start()
1304 time.sleep(0.02) # yield
1305 for t in threads:
1306 t.join()
1307 self.assertFalse(errors,
1308 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001310 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311 s = f.read()
1312 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001314 finally:
1315 support.unlink(support.TESTFN)
1316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317 def test_misbehaved_io(self):
1318 rawio = self.MisbehavedRawIO()
1319 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001320 self.assertRaises(OSError, bufio.seek, 0)
1321 self.assertRaises(OSError, bufio.tell)
1322 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001323
Florent Xicluna109d5732012-07-07 17:03:22 +02001324 def test_max_buffer_size_removal(self):
1325 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001326 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001327
Benjamin Peterson68623612012-12-20 11:53:11 -06001328 def test_write_error_on_close(self):
1329 raw = self.MockRawIO()
1330 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001331 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001332 raw.write = bad_write
1333 b = self.tp(raw)
1334 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001335 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001336 self.assertTrue(b.closed)
1337
Benjamin Peterson59406a92009-03-26 17:10:29 +00001338
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001339class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 tp = io.BufferedWriter
1341
1342 def test_constructor(self):
1343 BufferedWriterTest.test_constructor(self)
1344 # The allocation can succeed on 32-bit builds, e.g. with more
1345 # than 2GB RAM and a 64-bit kernel.
1346 if sys.maxsize > 0x7FFFFFFF:
1347 rawio = self.MockRawIO()
1348 bufio = self.tp(rawio)
1349 self.assertRaises((OverflowError, MemoryError, ValueError),
1350 bufio.__init__, rawio, sys.maxsize)
1351
1352 def test_initialization(self):
1353 rawio = self.MockRawIO()
1354 bufio = self.tp(rawio)
1355 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1356 self.assertRaises(ValueError, bufio.write, b"def")
1357 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1358 self.assertRaises(ValueError, bufio.write, b"def")
1359 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1360 self.assertRaises(ValueError, bufio.write, b"def")
1361
1362 def test_garbage_collection(self):
1363 # C BufferedWriter objects are collected, and collecting them flushes
1364 # all data to disk.
1365 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001366 with support.check_warnings(('', ResourceWarning)):
1367 rawio = self.FileIO(support.TESTFN, "w+b")
1368 f = self.tp(rawio)
1369 f.write(b"123xxx")
1370 f.x = f
1371 wr = weakref.ref(f)
1372 del f
1373 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001374 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001375 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 self.assertEqual(f.read(), b"123xxx")
1377
R David Murray67bfe802013-02-23 21:51:05 -05001378 def test_args_error(self):
1379 # Issue #17275
1380 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1381 self.tp(io.BytesIO(), 1024, 1024, 1024)
1382
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383
1384class PyBufferedWriterTest(BufferedWriterTest):
1385 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001386
Guido van Rossum01a27522007-03-07 01:00:12 +00001387class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001388
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001389 def test_constructor(self):
1390 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001391 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001392
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001393 def test_detach(self):
1394 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1395 self.assertRaises(self.UnsupportedOperation, pair.detach)
1396
Florent Xicluna109d5732012-07-07 17:03:22 +02001397 def test_constructor_max_buffer_size_removal(self):
1398 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001399 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001400
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001401 def test_constructor_with_not_readable(self):
1402 class NotReadable(MockRawIO):
1403 def readable(self):
1404 return False
1405
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001406 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001407
1408 def test_constructor_with_not_writeable(self):
1409 class NotWriteable(MockRawIO):
1410 def writable(self):
1411 return False
1412
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001413 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001414
1415 def test_read(self):
1416 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1417
1418 self.assertEqual(pair.read(3), b"abc")
1419 self.assertEqual(pair.read(1), b"d")
1420 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001421 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1422 self.assertEqual(pair.read(None), b"abc")
1423
1424 def test_readlines(self):
1425 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1426 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1427 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1428 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001429
1430 def test_read1(self):
1431 # .read1() is delegated to the underlying reader object, so this test
1432 # can be shallow.
1433 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1434
1435 self.assertEqual(pair.read1(3), b"abc")
1436
1437 def test_readinto(self):
1438 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1439
1440 data = bytearray(5)
1441 self.assertEqual(pair.readinto(data), 5)
1442 self.assertEqual(data, b"abcde")
1443
1444 def test_write(self):
1445 w = self.MockRawIO()
1446 pair = self.tp(self.MockRawIO(), w)
1447
1448 pair.write(b"abc")
1449 pair.flush()
1450 pair.write(b"def")
1451 pair.flush()
1452 self.assertEqual(w._write_stack, [b"abc", b"def"])
1453
1454 def test_peek(self):
1455 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1456
1457 self.assertTrue(pair.peek(3).startswith(b"abc"))
1458 self.assertEqual(pair.read(3), b"abc")
1459
1460 def test_readable(self):
1461 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1462 self.assertTrue(pair.readable())
1463
1464 def test_writeable(self):
1465 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1466 self.assertTrue(pair.writable())
1467
1468 def test_seekable(self):
1469 # BufferedRWPairs are never seekable, even if their readers and writers
1470 # are.
1471 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1472 self.assertFalse(pair.seekable())
1473
1474 # .flush() is delegated to the underlying writer object and has been
1475 # tested in the test_write method.
1476
1477 def test_close_and_closed(self):
1478 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1479 self.assertFalse(pair.closed)
1480 pair.close()
1481 self.assertTrue(pair.closed)
1482
1483 def test_isatty(self):
1484 class SelectableIsAtty(MockRawIO):
1485 def __init__(self, isatty):
1486 MockRawIO.__init__(self)
1487 self._isatty = isatty
1488
1489 def isatty(self):
1490 return self._isatty
1491
1492 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1493 self.assertFalse(pair.isatty())
1494
1495 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1496 self.assertTrue(pair.isatty())
1497
1498 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1499 self.assertTrue(pair.isatty())
1500
1501 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1502 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001504class CBufferedRWPairTest(BufferedRWPairTest):
1505 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001507class PyBufferedRWPairTest(BufferedRWPairTest):
1508 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510
1511class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1512 read_mode = "rb+"
1513 write_mode = "wb+"
1514
1515 def test_constructor(self):
1516 BufferedReaderTest.test_constructor(self)
1517 BufferedWriterTest.test_constructor(self)
1518
1519 def test_read_and_write(self):
1520 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001521 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001522
1523 self.assertEqual(b"as", rw.read(2))
1524 rw.write(b"ddd")
1525 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001526 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_seek_and_tell(self):
1531 raw = self.BytesIO(b"asdfghjkl")
1532 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001533
Ezio Melottib3aedd42010-11-20 19:04:17 +00001534 self.assertEqual(b"as", rw.read(2))
1535 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001536 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001537 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001538
Antoine Pitroue05565e2011-08-20 14:39:23 +02001539 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001540 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001541 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001542 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001543 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001545 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001546 self.assertEqual(7, rw.tell())
1547 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001548 rw.flush()
1549 self.assertEqual(b"asdf123fl", raw.getvalue())
1550
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001551 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 def check_flush_and_read(self, read_func):
1554 raw = self.BytesIO(b"abcdefghi")
1555 bufio = self.tp(raw)
1556
Ezio Melottib3aedd42010-11-20 19:04:17 +00001557 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001559 self.assertEqual(b"ef", read_func(bufio, 2))
1560 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001561 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001562 self.assertEqual(6, bufio.tell())
1563 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001564 raw.seek(0, 0)
1565 raw.write(b"XYZ")
1566 # flush() resets the read buffer
1567 bufio.flush()
1568 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001569 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001570
1571 def test_flush_and_read(self):
1572 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1573
1574 def test_flush_and_readinto(self):
1575 def _readinto(bufio, n=-1):
1576 b = bytearray(n if n >= 0 else 9999)
1577 n = bufio.readinto(b)
1578 return bytes(b[:n])
1579 self.check_flush_and_read(_readinto)
1580
1581 def test_flush_and_peek(self):
1582 def _peek(bufio, n=-1):
1583 # This relies on the fact that the buffer can contain the whole
1584 # raw stream, otherwise peek() can return less.
1585 b = bufio.peek(n)
1586 if n != -1:
1587 b = b[:n]
1588 bufio.seek(len(b), 1)
1589 return b
1590 self.check_flush_and_read(_peek)
1591
1592 def test_flush_and_write(self):
1593 raw = self.BytesIO(b"abcdefghi")
1594 bufio = self.tp(raw)
1595
1596 bufio.write(b"123")
1597 bufio.flush()
1598 bufio.write(b"45")
1599 bufio.flush()
1600 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001601 self.assertEqual(b"12345fghi", raw.getvalue())
1602 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603
1604 def test_threads(self):
1605 BufferedReaderTest.test_threads(self)
1606 BufferedWriterTest.test_threads(self)
1607
1608 def test_writes_and_peek(self):
1609 def _peek(bufio):
1610 bufio.peek(1)
1611 self.check_writes(_peek)
1612 def _peek(bufio):
1613 pos = bufio.tell()
1614 bufio.seek(-1, 1)
1615 bufio.peek(1)
1616 bufio.seek(pos, 0)
1617 self.check_writes(_peek)
1618
1619 def test_writes_and_reads(self):
1620 def _read(bufio):
1621 bufio.seek(-1, 1)
1622 bufio.read(1)
1623 self.check_writes(_read)
1624
1625 def test_writes_and_read1s(self):
1626 def _read1(bufio):
1627 bufio.seek(-1, 1)
1628 bufio.read1(1)
1629 self.check_writes(_read1)
1630
1631 def test_writes_and_readintos(self):
1632 def _read(bufio):
1633 bufio.seek(-1, 1)
1634 bufio.readinto(bytearray(1))
1635 self.check_writes(_read)
1636
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001637 def test_write_after_readahead(self):
1638 # Issue #6629: writing after the buffer was filled by readahead should
1639 # first rewind the raw stream.
1640 for overwrite_size in [1, 5]:
1641 raw = self.BytesIO(b"A" * 10)
1642 bufio = self.tp(raw, 4)
1643 # Trigger readahead
1644 self.assertEqual(bufio.read(1), b"A")
1645 self.assertEqual(bufio.tell(), 1)
1646 # Overwriting should rewind the raw stream if it needs so
1647 bufio.write(b"B" * overwrite_size)
1648 self.assertEqual(bufio.tell(), overwrite_size + 1)
1649 # If the write size was smaller than the buffer size, flush() and
1650 # check that rewind happens.
1651 bufio.flush()
1652 self.assertEqual(bufio.tell(), overwrite_size + 1)
1653 s = raw.getvalue()
1654 self.assertEqual(s,
1655 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1656
Antoine Pitrou7c404892011-05-13 00:13:33 +02001657 def test_write_rewind_write(self):
1658 # Various combinations of reading / writing / seeking backwards / writing again
1659 def mutate(bufio, pos1, pos2):
1660 assert pos2 >= pos1
1661 # Fill the buffer
1662 bufio.seek(pos1)
1663 bufio.read(pos2 - pos1)
1664 bufio.write(b'\x02')
1665 # This writes earlier than the previous write, but still inside
1666 # the buffer.
1667 bufio.seek(pos1)
1668 bufio.write(b'\x01')
1669
1670 b = b"\x80\x81\x82\x83\x84"
1671 for i in range(0, len(b)):
1672 for j in range(i, len(b)):
1673 raw = self.BytesIO(b)
1674 bufio = self.tp(raw, 100)
1675 mutate(bufio, i, j)
1676 bufio.flush()
1677 expected = bytearray(b)
1678 expected[j] = 2
1679 expected[i] = 1
1680 self.assertEqual(raw.getvalue(), expected,
1681 "failed result for i=%d, j=%d" % (i, j))
1682
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001683 def test_truncate_after_read_or_write(self):
1684 raw = self.BytesIO(b"A" * 10)
1685 bufio = self.tp(raw, 100)
1686 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1687 self.assertEqual(bufio.truncate(), 2)
1688 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1689 self.assertEqual(bufio.truncate(), 4)
1690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 def test_misbehaved_io(self):
1692 BufferedReaderTest.test_misbehaved_io(self)
1693 BufferedWriterTest.test_misbehaved_io(self)
1694
Antoine Pitroue05565e2011-08-20 14:39:23 +02001695 def test_interleaved_read_write(self):
1696 # Test for issue #12213
1697 with self.BytesIO(b'abcdefgh') as raw:
1698 with self.tp(raw, 100) as f:
1699 f.write(b"1")
1700 self.assertEqual(f.read(1), b'b')
1701 f.write(b'2')
1702 self.assertEqual(f.read1(1), b'd')
1703 f.write(b'3')
1704 buf = bytearray(1)
1705 f.readinto(buf)
1706 self.assertEqual(buf, b'f')
1707 f.write(b'4')
1708 self.assertEqual(f.peek(1), b'h')
1709 f.flush()
1710 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1711
1712 with self.BytesIO(b'abc') as raw:
1713 with self.tp(raw, 100) as f:
1714 self.assertEqual(f.read(1), b'a')
1715 f.write(b"2")
1716 self.assertEqual(f.read(1), b'c')
1717 f.flush()
1718 self.assertEqual(raw.getvalue(), b'a2c')
1719
1720 def test_interleaved_readline_write(self):
1721 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1722 with self.tp(raw) as f:
1723 f.write(b'1')
1724 self.assertEqual(f.readline(), b'b\n')
1725 f.write(b'2')
1726 self.assertEqual(f.readline(), b'def\n')
1727 f.write(b'3')
1728 self.assertEqual(f.readline(), b'\n')
1729 f.flush()
1730 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1731
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001732 # You can't construct a BufferedRandom over a non-seekable stream.
1733 test_unseekable = None
1734
R David Murray67bfe802013-02-23 21:51:05 -05001735
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001736class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 tp = io.BufferedRandom
1738
1739 def test_constructor(self):
1740 BufferedRandomTest.test_constructor(self)
1741 # The allocation can succeed on 32-bit builds, e.g. with more
1742 # than 2GB RAM and a 64-bit kernel.
1743 if sys.maxsize > 0x7FFFFFFF:
1744 rawio = self.MockRawIO()
1745 bufio = self.tp(rawio)
1746 self.assertRaises((OverflowError, MemoryError, ValueError),
1747 bufio.__init__, rawio, sys.maxsize)
1748
1749 def test_garbage_collection(self):
1750 CBufferedReaderTest.test_garbage_collection(self)
1751 CBufferedWriterTest.test_garbage_collection(self)
1752
R David Murray67bfe802013-02-23 21:51:05 -05001753 def test_args_error(self):
1754 # Issue #17275
1755 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1756 self.tp(io.BytesIO(), 1024, 1024, 1024)
1757
1758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759class PyBufferedRandomTest(BufferedRandomTest):
1760 tp = pyio.BufferedRandom
1761
1762
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001763# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1764# properties:
1765# - A single output character can correspond to many bytes of input.
1766# - The number of input bytes to complete the character can be
1767# undetermined until the last input byte is received.
1768# - The number of input bytes can vary depending on previous input.
1769# - A single input byte can correspond to many characters of output.
1770# - The number of output characters can be undetermined until the
1771# last input byte is received.
1772# - The number of output characters can vary depending on previous input.
1773
1774class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1775 """
1776 For testing seek/tell behavior with a stateful, buffering decoder.
1777
1778 Input is a sequence of words. Words may be fixed-length (length set
1779 by input) or variable-length (period-terminated). In variable-length
1780 mode, extra periods are ignored. Possible words are:
1781 - 'i' followed by a number sets the input length, I (maximum 99).
1782 When I is set to 0, words are space-terminated.
1783 - 'o' followed by a number sets the output length, O (maximum 99).
1784 - Any other word is converted into a word followed by a period on
1785 the output. The output word consists of the input word truncated
1786 or padded out with hyphens to make its length equal to O. If O
1787 is 0, the word is output verbatim without truncating or padding.
1788 I and O are initially set to 1. When I changes, any buffered input is
1789 re-scanned according to the new I. EOF also terminates the last word.
1790 """
1791
1792 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001793 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001794 self.reset()
1795
1796 def __repr__(self):
1797 return '<SID %x>' % id(self)
1798
1799 def reset(self):
1800 self.i = 1
1801 self.o = 1
1802 self.buffer = bytearray()
1803
1804 def getstate(self):
1805 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1806 return bytes(self.buffer), i*100 + o
1807
1808 def setstate(self, state):
1809 buffer, io = state
1810 self.buffer = bytearray(buffer)
1811 i, o = divmod(io, 100)
1812 self.i, self.o = i ^ 1, o ^ 1
1813
1814 def decode(self, input, final=False):
1815 output = ''
1816 for b in input:
1817 if self.i == 0: # variable-length, terminated with period
1818 if b == ord('.'):
1819 if self.buffer:
1820 output += self.process_word()
1821 else:
1822 self.buffer.append(b)
1823 else: # fixed-length, terminate after self.i bytes
1824 self.buffer.append(b)
1825 if len(self.buffer) == self.i:
1826 output += self.process_word()
1827 if final and self.buffer: # EOF terminates the last word
1828 output += self.process_word()
1829 return output
1830
1831 def process_word(self):
1832 output = ''
1833 if self.buffer[0] == ord('i'):
1834 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1835 elif self.buffer[0] == ord('o'):
1836 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1837 else:
1838 output = self.buffer.decode('ascii')
1839 if len(output) < self.o:
1840 output += '-'*self.o # pad out with hyphens
1841 if self.o:
1842 output = output[:self.o] # truncate to output length
1843 output += '.'
1844 self.buffer = bytearray()
1845 return output
1846
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001847 codecEnabled = False
1848
1849 @classmethod
1850 def lookupTestDecoder(cls, name):
1851 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001852 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001853 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001854 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001855 incrementalencoder=None,
1856 streamreader=None, streamwriter=None,
1857 incrementaldecoder=cls)
1858
1859# Register the previous decoder for testing.
1860# Disabled by default, tests will enable it.
1861codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1862
1863
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001864class StatefulIncrementalDecoderTest(unittest.TestCase):
1865 """
1866 Make sure the StatefulIncrementalDecoder actually works.
1867 """
1868
1869 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001870 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001871 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001872 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001873 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001874 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001875 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001876 # I=0, O=6 (variable-length input, fixed-length output)
1877 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1878 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001879 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001880 # I=6, O=3 (fixed-length input > fixed-length output)
1881 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1882 # I=0, then 3; O=29, then 15 (with longer output)
1883 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1884 'a----------------------------.' +
1885 'b----------------------------.' +
1886 'cde--------------------------.' +
1887 'abcdefghijabcde.' +
1888 'a.b------------.' +
1889 '.c.------------.' +
1890 'd.e------------.' +
1891 'k--------------.' +
1892 'l--------------.' +
1893 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001894 ]
1895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001897 # Try a few one-shot test cases.
1898 for input, eof, output in self.test_cases:
1899 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001900 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001901
1902 # Also test an unfinished decode, followed by forcing EOF.
1903 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001904 self.assertEqual(d.decode(b'oiabcd'), '')
1905 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001906
1907class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001908
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001909 def setUp(self):
1910 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1911 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001912 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001913
Guido van Rossumd0712812007-04-11 16:32:43 +00001914 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001915 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 def test_constructor(self):
1918 r = self.BytesIO(b"\xc3\xa9\n\n")
1919 b = self.BufferedReader(r, 1000)
1920 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001921 t.__init__(b, encoding="latin-1", newline="\r\n")
1922 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001923 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001924 t.__init__(b, encoding="utf-8", line_buffering=True)
1925 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001926 self.assertEqual(t.line_buffering, True)
1927 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 self.assertRaises(TypeError, t.__init__, b, newline=42)
1929 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1930
Nick Coghlana9b15242014-02-04 22:11:18 +10001931 def test_non_text_encoding_codecs_are_rejected(self):
1932 # Ensure the constructor complains if passed a codec that isn't
1933 # marked as a text encoding
1934 # http://bugs.python.org/issue20404
1935 r = self.BytesIO()
1936 b = self.BufferedWriter(r)
1937 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
1938 self.TextIOWrapper(b, encoding="hex")
1939
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001940 def test_detach(self):
1941 r = self.BytesIO()
1942 b = self.BufferedWriter(r)
1943 t = self.TextIOWrapper(b)
1944 self.assertIs(t.detach(), b)
1945
1946 t = self.TextIOWrapper(b, encoding="ascii")
1947 t.write("howdy")
1948 self.assertFalse(r.getvalue())
1949 t.detach()
1950 self.assertEqual(r.getvalue(), b"howdy")
1951 self.assertRaises(ValueError, t.detach)
1952
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001953 def test_repr(self):
1954 raw = self.BytesIO("hello".encode("utf-8"))
1955 b = self.BufferedReader(raw)
1956 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001957 modname = self.TextIOWrapper.__module__
1958 self.assertEqual(repr(t),
1959 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1960 raw.name = "dummy"
1961 self.assertEqual(repr(t),
1962 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001963 t.mode = "r"
1964 self.assertEqual(repr(t),
1965 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001966 raw.name = b"dummy"
1967 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001968 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 def test_line_buffering(self):
1971 r = self.BytesIO()
1972 b = self.BufferedWriter(r, 1000)
1973 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001974 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001975 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001976 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001977 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001978 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001979 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001980
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001981 def test_default_encoding(self):
1982 old_environ = dict(os.environ)
1983 try:
1984 # try to get a user preferred encoding different than the current
1985 # locale encoding to check that TextIOWrapper() uses the current
1986 # locale encoding and not the user preferred encoding
1987 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1988 if key in os.environ:
1989 del os.environ[key]
1990
1991 current_locale_encoding = locale.getpreferredencoding(False)
1992 b = self.BytesIO()
1993 t = self.TextIOWrapper(b)
1994 self.assertEqual(t.encoding, current_locale_encoding)
1995 finally:
1996 os.environ.clear()
1997 os.environ.update(old_environ)
1998
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02001999 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002000 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002001 # Issue 15989
2002 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002003 b = self.BytesIO()
2004 b.fileno = lambda: _testcapi.INT_MAX + 1
2005 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2006 b.fileno = lambda: _testcapi.UINT_MAX + 1
2007 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 def test_encoding(self):
2010 # Check the encoding attribute is always set, and valid
2011 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002012 t = self.TextIOWrapper(b, encoding="utf-8")
2013 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002015 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 codecs.lookup(t.encoding)
2017
2018 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002019 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002020 b = self.BytesIO(b"abc\n\xff\n")
2021 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002022 self.assertRaises(UnicodeError, t.read)
2023 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 b = self.BytesIO(b"abc\n\xff\n")
2025 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002026 self.assertRaises(UnicodeError, t.read)
2027 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028 b = self.BytesIO(b"abc\n\xff\n")
2029 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002030 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002031 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 b = self.BytesIO(b"abc\n\xff\n")
2033 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002034 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002037 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 b = self.BytesIO()
2039 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002040 self.assertRaises(UnicodeError, t.write, "\xff")
2041 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 b = self.BytesIO()
2043 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002044 self.assertRaises(UnicodeError, t.write, "\xff")
2045 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 b = self.BytesIO()
2047 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002048 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002049 t.write("abc\xffdef\n")
2050 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002052 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 b = self.BytesIO()
2054 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002055 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002056 t.write("abc\xffdef\n")
2057 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002058 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002060 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002061 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2062
2063 tests = [
2064 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002065 [ '', input_lines ],
2066 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2067 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2068 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002069 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002070 encodings = (
2071 'utf-8', 'latin-1',
2072 'utf-16', 'utf-16-le', 'utf-16-be',
2073 'utf-32', 'utf-32-le', 'utf-32-be',
2074 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002075
Guido van Rossum8358db22007-08-18 21:39:55 +00002076 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002077 # character in TextIOWrapper._pending_line.
2078 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002079 # XXX: str.encode() should return bytes
2080 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002081 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002082 for bufsize in range(1, 10):
2083 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2085 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002086 encoding=encoding)
2087 if do_reads:
2088 got_lines = []
2089 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002090 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002091 if c2 == '':
2092 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002093 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002094 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002095 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002096 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002097
2098 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002099 self.assertEqual(got_line, exp_line)
2100 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 def test_newlines_input(self):
2103 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002104 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2105 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002106 (None, normalized.decode("ascii").splitlines(keepends=True)),
2107 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2109 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2110 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002111 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 buf = self.BytesIO(testdata)
2113 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002115 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002116 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 def test_newlines_output(self):
2119 testdict = {
2120 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2121 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2122 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2123 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2124 }
2125 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2126 for newline, expected in tests:
2127 buf = self.BytesIO()
2128 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2129 txt.write("AAA\nB")
2130 txt.write("BB\nCCC\n")
2131 txt.write("X\rY\r\nZ")
2132 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002133 self.assertEqual(buf.closed, False)
2134 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135
2136 def test_destructor(self):
2137 l = []
2138 base = self.BytesIO
2139 class MyBytesIO(base):
2140 def close(self):
2141 l.append(self.getvalue())
2142 base.close(self)
2143 b = MyBytesIO()
2144 t = self.TextIOWrapper(b, encoding="ascii")
2145 t.write("abc")
2146 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002147 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002148 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149
2150 def test_override_destructor(self):
2151 record = []
2152 class MyTextIO(self.TextIOWrapper):
2153 def __del__(self):
2154 record.append(1)
2155 try:
2156 f = super().__del__
2157 except AttributeError:
2158 pass
2159 else:
2160 f()
2161 def close(self):
2162 record.append(2)
2163 super().close()
2164 def flush(self):
2165 record.append(3)
2166 super().flush()
2167 b = self.BytesIO()
2168 t = MyTextIO(b, encoding="ascii")
2169 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002170 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 self.assertEqual(record, [1, 2, 3])
2172
2173 def test_error_through_destructor(self):
2174 # Test that the exception state is not modified by a destructor,
2175 # even if close() fails.
2176 rawio = self.CloseFailureIO()
2177 def f():
2178 self.TextIOWrapper(rawio).xyzzy
2179 with support.captured_output("stderr") as s:
2180 self.assertRaises(AttributeError, f)
2181 s = s.getvalue().strip()
2182 if s:
2183 # The destructor *may* have printed an unraisable error, check it
2184 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002185 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002186 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002187
Guido van Rossum9b76da62007-04-11 01:09:03 +00002188 # Systematic tests of the text I/O API
2189
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002191 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002192 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002194 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002196 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002198 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002199 self.assertEqual(f.tell(), 0)
2200 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002201 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(f.seek(0), 0)
2203 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002204 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(f.read(2), "ab")
2206 self.assertEqual(f.read(1), "c")
2207 self.assertEqual(f.read(1), "")
2208 self.assertEqual(f.read(), "")
2209 self.assertEqual(f.tell(), cookie)
2210 self.assertEqual(f.seek(0), 0)
2211 self.assertEqual(f.seek(0, 2), cookie)
2212 self.assertEqual(f.write("def"), 3)
2213 self.assertEqual(f.seek(cookie), cookie)
2214 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002215 if enc.startswith("utf"):
2216 self.multi_line_test(f, enc)
2217 f.close()
2218
2219 def multi_line_test(self, f, enc):
2220 f.seek(0)
2221 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002222 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002223 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002224 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002225 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002226 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002227 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002228 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002229 wlines.append((f.tell(), line))
2230 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002231 f.seek(0)
2232 rlines = []
2233 while True:
2234 pos = f.tell()
2235 line = f.readline()
2236 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002237 break
2238 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002242 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002243 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002244 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002245 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002246 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002247 p2 = f.tell()
2248 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(f.tell(), p0)
2250 self.assertEqual(f.readline(), "\xff\n")
2251 self.assertEqual(f.tell(), p1)
2252 self.assertEqual(f.readline(), "\xff\n")
2253 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002254 f.seek(0)
2255 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002256 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002257 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002258 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002259 f.close()
2260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 def test_seeking(self):
2262 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002263 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002264 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002265 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002267 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002268 suffix = bytes(u_suffix.encode("utf-8"))
2269 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002270 with self.open(support.TESTFN, "wb") as f:
2271 f.write(line*2)
2272 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2273 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002274 self.assertEqual(s, str(prefix, "ascii"))
2275 self.assertEqual(f.tell(), prefix_size)
2276 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002278 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002279 # Regression test for a specific bug
2280 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002281 with self.open(support.TESTFN, "wb") as f:
2282 f.write(data)
2283 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2284 f._CHUNK_SIZE # Just test that it exists
2285 f._CHUNK_SIZE = 2
2286 f.readline()
2287 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002289 def test_seek_and_tell(self):
2290 #Test seek/tell using the StatefulIncrementalDecoder.
2291 # Make test faster by doing smaller seeks
2292 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002293
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002294 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002295 """Tell/seek to various points within a data stream and ensure
2296 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002298 f.write(data)
2299 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300 f = self.open(support.TESTFN, encoding='test_decoder')
2301 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002302 decoded = f.read()
2303 f.close()
2304
Neal Norwitze2b07052008-03-18 19:52:05 +00002305 for i in range(min_pos, len(decoded) + 1): # seek positions
2306 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002307 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002308 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002309 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002310 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002311 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002313 f.close()
2314
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002315 # Enable the test decoder.
2316 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002317
2318 # Run the tests.
2319 try:
2320 # Try each test case.
2321 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002322 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002323
2324 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002325 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2326 offset = CHUNK_SIZE - len(input)//2
2327 prefix = b'.'*offset
2328 # Don't bother seeking into the prefix (takes too long).
2329 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002330 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331
2332 # Ensure our test decoder won't interfere with subsequent tests.
2333 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002334 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002337 data = "1234567890"
2338 tests = ("utf-16",
2339 "utf-16-le",
2340 "utf-16-be",
2341 "utf-32",
2342 "utf-32-le",
2343 "utf-32-be")
2344 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 buf = self.BytesIO()
2346 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002347 # Check if the BOM is written only once (see issue1753).
2348 f.write(data)
2349 f.write(data)
2350 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002351 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002352 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002353 self.assertEqual(f.read(), data * 2)
2354 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002355
Benjamin Petersona1b49012009-03-31 23:11:32 +00002356 def test_unreadable(self):
2357 class UnReadable(self.BytesIO):
2358 def readable(self):
2359 return False
2360 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002361 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 def test_read_one_by_one(self):
2364 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002365 reads = ""
2366 while True:
2367 c = txt.read(1)
2368 if not c:
2369 break
2370 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002371 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002372
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002373 def test_readlines(self):
2374 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2375 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2376 txt.seek(0)
2377 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2378 txt.seek(0)
2379 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2380
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002381 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002385 reads = ""
2386 while True:
2387 c = txt.read(128)
2388 if not c:
2389 break
2390 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002391 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002392
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002393 def test_writelines(self):
2394 l = ['ab', 'cd', 'ef']
2395 buf = self.BytesIO()
2396 txt = self.TextIOWrapper(buf)
2397 txt.writelines(l)
2398 txt.flush()
2399 self.assertEqual(buf.getvalue(), b'abcdef')
2400
2401 def test_writelines_userlist(self):
2402 l = UserList(['ab', 'cd', 'ef'])
2403 buf = self.BytesIO()
2404 txt = self.TextIOWrapper(buf)
2405 txt.writelines(l)
2406 txt.flush()
2407 self.assertEqual(buf.getvalue(), b'abcdef')
2408
2409 def test_writelines_error(self):
2410 txt = self.TextIOWrapper(self.BytesIO())
2411 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2412 self.assertRaises(TypeError, txt.writelines, None)
2413 self.assertRaises(TypeError, txt.writelines, b'abc')
2414
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002415 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002417
2418 # read one char at a time
2419 reads = ""
2420 while True:
2421 c = txt.read(1)
2422 if not c:
2423 break
2424 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002425 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002426
2427 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002429 txt._CHUNK_SIZE = 4
2430
2431 reads = ""
2432 while True:
2433 c = txt.read(4)
2434 if not c:
2435 break
2436 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002437 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002438
2439 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002441 txt._CHUNK_SIZE = 4
2442
2443 reads = txt.read(4)
2444 reads += txt.read(4)
2445 reads += txt.readline()
2446 reads += txt.readline()
2447 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002448 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002449
2450 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002451 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002452 txt._CHUNK_SIZE = 4
2453
2454 reads = txt.read(4)
2455 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002456 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457
2458 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002459 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002460 txt._CHUNK_SIZE = 4
2461
2462 reads = txt.read(4)
2463 pos = txt.tell()
2464 txt.seek(0)
2465 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002467
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002468 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002469 buffer = self.BytesIO(self.testdata)
2470 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002471
2472 self.assertEqual(buffer.seekable(), txt.seekable())
2473
Antoine Pitroue4501852009-05-14 18:55:55 +00002474 def test_append_bom(self):
2475 # The BOM is not written again when appending to a non-empty file
2476 filename = support.TESTFN
2477 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2478 with self.open(filename, 'w', encoding=charset) as f:
2479 f.write('aaa')
2480 pos = f.tell()
2481 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002483
2484 with self.open(filename, 'a', encoding=charset) as f:
2485 f.write('xxx')
2486 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002488
2489 def test_seek_bom(self):
2490 # Same test, but when seeking manually
2491 filename = support.TESTFN
2492 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2493 with self.open(filename, 'w', encoding=charset) as f:
2494 f.write('aaa')
2495 pos = f.tell()
2496 with self.open(filename, 'r+', encoding=charset) as f:
2497 f.seek(pos)
2498 f.write('zzz')
2499 f.seek(0)
2500 f.write('bbb')
2501 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002503
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002504 def test_errors_property(self):
2505 with self.open(support.TESTFN, "w") as f:
2506 self.assertEqual(f.errors, "strict")
2507 with self.open(support.TESTFN, "w", errors="replace") as f:
2508 self.assertEqual(f.errors, "replace")
2509
Brett Cannon31f59292011-02-21 19:29:56 +00002510 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002511 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002512 def test_threads_write(self):
2513 # Issue6750: concurrent writes could duplicate data
2514 event = threading.Event()
2515 with self.open(support.TESTFN, "w", buffering=1) as f:
2516 def run(n):
2517 text = "Thread%03d\n" % n
2518 event.wait()
2519 f.write(text)
2520 threads = [threading.Thread(target=lambda n=x: run(n))
2521 for x in range(20)]
2522 for t in threads:
2523 t.start()
2524 time.sleep(0.02)
2525 event.set()
2526 for t in threads:
2527 t.join()
2528 with self.open(support.TESTFN) as f:
2529 content = f.read()
2530 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002531 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002532
Antoine Pitrou6be88762010-05-03 16:48:20 +00002533 def test_flush_error_on_close(self):
2534 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2535 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002536 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002537 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002538 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002539 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002540
2541 def test_multi_close(self):
2542 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2543 txt.close()
2544 txt.close()
2545 txt.close()
2546 self.assertRaises(ValueError, txt.flush)
2547
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002548 def test_unseekable(self):
2549 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2550 self.assertRaises(self.UnsupportedOperation, txt.tell)
2551 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2552
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002553 def test_readonly_attributes(self):
2554 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2555 buf = self.BytesIO(self.testdata)
2556 with self.assertRaises(AttributeError):
2557 txt.buffer = buf
2558
Antoine Pitroue96ec682011-07-23 21:46:35 +02002559 def test_rawio(self):
2560 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2561 # that subprocess.Popen() can have the required unbuffered
2562 # semantics with universal_newlines=True.
2563 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2564 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2565 # Reads
2566 self.assertEqual(txt.read(4), 'abcd')
2567 self.assertEqual(txt.readline(), 'efghi\n')
2568 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2569
2570 def test_rawio_write_through(self):
2571 # Issue #12591: with write_through=True, writes don't need a flush
2572 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2573 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2574 write_through=True)
2575 txt.write('1')
2576 txt.write('23\n4')
2577 txt.write('5')
2578 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2579
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002580 def test_read_nonbytes(self):
2581 # Issue #17106
2582 # Crash when underlying read() returns non-bytes
2583 t = self.TextIOWrapper(self.StringIO('a'))
2584 self.assertRaises(TypeError, t.read, 1)
2585 t = self.TextIOWrapper(self.StringIO('a'))
2586 self.assertRaises(TypeError, t.readline)
2587 t = self.TextIOWrapper(self.StringIO('a'))
2588 self.assertRaises(TypeError, t.read)
2589
2590 def test_illegal_decoder(self):
2591 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002592 # Bypass the early encoding check added in issue 20404
2593 def _make_illegal_wrapper():
2594 quopri = codecs.lookup("quopri")
2595 quopri._is_text_encoding = True
2596 try:
2597 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2598 newline='\n', encoding="quopri")
2599 finally:
2600 quopri._is_text_encoding = False
2601 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002602 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002603 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002604 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002605 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002606 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002607 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002608 self.assertRaises(TypeError, t.read)
2609
Antoine Pitrou712cb732013-12-21 15:51:54 +01002610 def _check_create_at_shutdown(self, **kwargs):
2611 # Issue #20037: creating a TextIOWrapper at shutdown
2612 # shouldn't crash the interpreter.
2613 iomod = self.io.__name__
2614 code = """if 1:
2615 import codecs
2616 import {iomod} as io
2617
2618 # Avoid looking up codecs at shutdown
2619 codecs.lookup('utf-8')
2620
2621 class C:
2622 def __init__(self):
2623 self.buf = io.BytesIO()
2624 def __del__(self):
2625 io.TextIOWrapper(self.buf, **{kwargs})
2626 print("ok")
2627 c = C()
2628 """.format(iomod=iomod, kwargs=kwargs)
2629 return assert_python_ok("-c", code)
2630
2631 def test_create_at_shutdown_without_encoding(self):
2632 rc, out, err = self._check_create_at_shutdown()
2633 if err:
2634 # Can error out with a RuntimeError if the module state
2635 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002636 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002637 else:
2638 self.assertEqual("ok", out.decode().strip())
2639
2640 def test_create_at_shutdown_with_encoding(self):
2641 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2642 errors='strict')
2643 self.assertFalse(err)
2644 self.assertEqual("ok", out.decode().strip())
2645
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002646
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002648 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002649 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650
2651 def test_initialization(self):
2652 r = self.BytesIO(b"\xc3\xa9\n\n")
2653 b = self.BufferedReader(r, 1000)
2654 t = self.TextIOWrapper(b)
2655 self.assertRaises(TypeError, t.__init__, b, newline=42)
2656 self.assertRaises(ValueError, t.read)
2657 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2658 self.assertRaises(ValueError, t.read)
2659
2660 def test_garbage_collection(self):
2661 # C TextIOWrapper objects are collected, and collecting them flushes
2662 # all data to disk.
2663 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002664 with support.check_warnings(('', ResourceWarning)):
2665 rawio = io.FileIO(support.TESTFN, "wb")
2666 b = self.BufferedWriter(rawio)
2667 t = self.TextIOWrapper(b, encoding="ascii")
2668 t.write("456def")
2669 t.x = t
2670 wr = weakref.ref(t)
2671 del t
2672 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002673 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002674 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 self.assertEqual(f.read(), b"456def")
2676
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002677 def test_rwpair_cleared_before_textio(self):
2678 # Issue 13070: TextIOWrapper's finalization would crash when called
2679 # after the reference to the underlying BufferedRWPair's writer got
2680 # cleared by the GC.
2681 for i in range(1000):
2682 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2683 t1 = self.TextIOWrapper(b1, encoding="ascii")
2684 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2685 t2 = self.TextIOWrapper(b2, encoding="ascii")
2686 # circular references
2687 t1.buddy = t2
2688 t2.buddy = t1
2689 support.gc_collect()
2690
2691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002693 io = pyio
Nick Coghlana9b15242014-02-04 22:11:18 +10002694 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695
2696
2697class IncrementalNewlineDecoderTest(unittest.TestCase):
2698
2699 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002700 # UTF-8 specific tests for a newline decoder
2701 def _check_decode(b, s, **kwargs):
2702 # We exercise getstate() / setstate() as well as decode()
2703 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002704 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002705 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002706 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002707
Antoine Pitrou180a3362008-12-14 16:36:46 +00002708 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002709
Antoine Pitrou180a3362008-12-14 16:36:46 +00002710 _check_decode(b'\xe8', "")
2711 _check_decode(b'\xa2', "")
2712 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002713
Antoine Pitrou180a3362008-12-14 16:36:46 +00002714 _check_decode(b'\xe8', "")
2715 _check_decode(b'\xa2', "")
2716 _check_decode(b'\x88', "\u8888")
2717
2718 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002719 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2720
Antoine Pitrou180a3362008-12-14 16:36:46 +00002721 decoder.reset()
2722 _check_decode(b'\n', "\n")
2723 _check_decode(b'\r', "")
2724 _check_decode(b'', "\n", final=True)
2725 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002726
Antoine Pitrou180a3362008-12-14 16:36:46 +00002727 _check_decode(b'\r', "")
2728 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002729
Antoine Pitrou180a3362008-12-14 16:36:46 +00002730 _check_decode(b'\r\r\n', "\n\n")
2731 _check_decode(b'\r', "")
2732 _check_decode(b'\r', "\n")
2733 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002734
Antoine Pitrou180a3362008-12-14 16:36:46 +00002735 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2736 _check_decode(b'\xe8\xa2\x88', "\u8888")
2737 _check_decode(b'\n', "\n")
2738 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2739 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002742 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002743 if encoding is not None:
2744 encoder = codecs.getincrementalencoder(encoding)()
2745 def _decode_bytewise(s):
2746 # Decode one byte at a time
2747 for b in encoder.encode(s):
2748 result.append(decoder.decode(bytes([b])))
2749 else:
2750 encoder = None
2751 def _decode_bytewise(s):
2752 # Decode one char at a time
2753 for c in s:
2754 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002755 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002756 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002757 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002758 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002759 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002760 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002761 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002762 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002763 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002764 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002765 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002766 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002767 input = "abc"
2768 if encoder is not None:
2769 encoder.reset()
2770 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(decoder.decode(input), "abc")
2772 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002773
2774 def test_newline_decoder(self):
2775 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 # None meaning the IncrementalNewlineDecoder takes unicode input
2777 # rather than bytes input
2778 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002779 'utf-16', 'utf-16-le', 'utf-16-be',
2780 'utf-32', 'utf-32-le', 'utf-32-be',
2781 )
2782 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002783 decoder = enc and codecs.getincrementaldecoder(enc)()
2784 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2785 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002786 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2788 self.check_newline_decoding_utf8(decoder)
2789
Antoine Pitrou66913e22009-03-06 23:40:56 +00002790 def test_newline_bytes(self):
2791 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2792 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002793 self.assertEqual(dec.newlines, None)
2794 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2795 self.assertEqual(dec.newlines, None)
2796 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2797 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002798 dec = self.IncrementalNewlineDecoder(None, translate=False)
2799 _check(dec)
2800 dec = self.IncrementalNewlineDecoder(None, translate=True)
2801 _check(dec)
2802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2804 pass
2805
2806class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2807 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002808
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002809
Guido van Rossum01a27522007-03-07 01:00:12 +00002810# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002811
Guido van Rossum5abbf752007-08-27 17:39:33 +00002812class MiscIOTest(unittest.TestCase):
2813
Barry Warsaw40e82462008-11-20 20:14:50 +00002814 def tearDown(self):
2815 support.unlink(support.TESTFN)
2816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 def test___all__(self):
2818 for name in self.io.__all__:
2819 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002820 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002821 if name == "open":
2822 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002823 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002824 self.assertTrue(issubclass(obj, Exception), name)
2825 elif not name.startswith("SEEK_"):
2826 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002827
Barry Warsaw40e82462008-11-20 20:14:50 +00002828 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002829 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002831 f.close()
2832
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002833 with support.check_warnings(('', DeprecationWarning)):
2834 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002835 self.assertEqual(f.name, support.TESTFN)
2836 self.assertEqual(f.buffer.name, support.TESTFN)
2837 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2838 self.assertEqual(f.mode, "U")
2839 self.assertEqual(f.buffer.mode, "rb")
2840 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002841 f.close()
2842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002843 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(f.mode, "w+")
2845 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2846 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002849 self.assertEqual(g.mode, "wb")
2850 self.assertEqual(g.raw.mode, "wb")
2851 self.assertEqual(g.name, f.fileno())
2852 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002853 f.close()
2854 g.close()
2855
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002856 def test_io_after_close(self):
2857 for kwargs in [
2858 {"mode": "w"},
2859 {"mode": "wb"},
2860 {"mode": "w", "buffering": 1},
2861 {"mode": "w", "buffering": 2},
2862 {"mode": "wb", "buffering": 0},
2863 {"mode": "r"},
2864 {"mode": "rb"},
2865 {"mode": "r", "buffering": 1},
2866 {"mode": "r", "buffering": 2},
2867 {"mode": "rb", "buffering": 0},
2868 {"mode": "w+"},
2869 {"mode": "w+b"},
2870 {"mode": "w+", "buffering": 1},
2871 {"mode": "w+", "buffering": 2},
2872 {"mode": "w+b", "buffering": 0},
2873 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002874 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002875 f.close()
2876 self.assertRaises(ValueError, f.flush)
2877 self.assertRaises(ValueError, f.fileno)
2878 self.assertRaises(ValueError, f.isatty)
2879 self.assertRaises(ValueError, f.__iter__)
2880 if hasattr(f, "peek"):
2881 self.assertRaises(ValueError, f.peek, 1)
2882 self.assertRaises(ValueError, f.read)
2883 if hasattr(f, "read1"):
2884 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002885 if hasattr(f, "readall"):
2886 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002887 if hasattr(f, "readinto"):
2888 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2889 self.assertRaises(ValueError, f.readline)
2890 self.assertRaises(ValueError, f.readlines)
2891 self.assertRaises(ValueError, f.seek, 0)
2892 self.assertRaises(ValueError, f.tell)
2893 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002894 self.assertRaises(ValueError, f.write,
2895 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002896 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002897 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002899 def test_blockingioerror(self):
2900 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002901 class C(str):
2902 pass
2903 c = C("")
2904 b = self.BlockingIOError(1, c)
2905 c.b = b
2906 b.c = c
2907 wr = weakref.ref(c)
2908 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002909 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002910 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002911
2912 def test_abcs(self):
2913 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002914 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2915 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2916 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2917 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002918
2919 def _check_abc_inheritance(self, abcmodule):
2920 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002921 self.assertIsInstance(f, abcmodule.IOBase)
2922 self.assertIsInstance(f, abcmodule.RawIOBase)
2923 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2924 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002925 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002926 self.assertIsInstance(f, abcmodule.IOBase)
2927 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2928 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2929 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002930 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002931 self.assertIsInstance(f, abcmodule.IOBase)
2932 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2933 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2934 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002935
2936 def test_abc_inheritance(self):
2937 # Test implementations inherit from their respective ABCs
2938 self._check_abc_inheritance(self)
2939
2940 def test_abc_inheritance_official(self):
2941 # Test implementations inherit from the official ABCs of the
2942 # baseline "io" module.
2943 self._check_abc_inheritance(io)
2944
Antoine Pitroue033e062010-10-29 10:38:18 +00002945 def _check_warn_on_dealloc(self, *args, **kwargs):
2946 f = open(*args, **kwargs)
2947 r = repr(f)
2948 with self.assertWarns(ResourceWarning) as cm:
2949 f = None
2950 support.gc_collect()
2951 self.assertIn(r, str(cm.warning.args[0]))
2952
2953 def test_warn_on_dealloc(self):
2954 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2955 self._check_warn_on_dealloc(support.TESTFN, "wb")
2956 self._check_warn_on_dealloc(support.TESTFN, "w")
2957
2958 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2959 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002960 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002961 for fd in fds:
2962 try:
2963 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002964 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002965 if e.errno != errno.EBADF:
2966 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002967 self.addCleanup(cleanup_fds)
2968 r, w = os.pipe()
2969 fds += r, w
2970 self._check_warn_on_dealloc(r, *args, **kwargs)
2971 # When using closefd=False, there's no warning
2972 r, w = os.pipe()
2973 fds += r, w
2974 with warnings.catch_warnings(record=True) as recorded:
2975 open(r, *args, closefd=False, **kwargs)
2976 support.gc_collect()
2977 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002978
2979 def test_warn_on_dealloc_fd(self):
2980 self._check_warn_on_dealloc_fd("rb", buffering=0)
2981 self._check_warn_on_dealloc_fd("rb")
2982 self._check_warn_on_dealloc_fd("r")
2983
2984
Antoine Pitrou243757e2010-11-05 21:15:39 +00002985 def test_pickling(self):
2986 # Pickling file objects is forbidden
2987 for kwargs in [
2988 {"mode": "w"},
2989 {"mode": "wb"},
2990 {"mode": "wb", "buffering": 0},
2991 {"mode": "r"},
2992 {"mode": "rb"},
2993 {"mode": "rb", "buffering": 0},
2994 {"mode": "w+"},
2995 {"mode": "w+b"},
2996 {"mode": "w+b", "buffering": 0},
2997 ]:
2998 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2999 with self.open(support.TESTFN, **kwargs) as f:
3000 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3001
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003002 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3003 def test_nonblock_pipe_write_bigbuf(self):
3004 self._test_nonblock_pipe_write(16*1024)
3005
3006 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3007 def test_nonblock_pipe_write_smallbuf(self):
3008 self._test_nonblock_pipe_write(1024)
3009
3010 def _set_non_blocking(self, fd):
3011 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3012 self.assertNotEqual(flags, -1)
3013 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3014 self.assertEqual(res, 0)
3015
3016 def _test_nonblock_pipe_write(self, bufsize):
3017 sent = []
3018 received = []
3019 r, w = os.pipe()
3020 self._set_non_blocking(r)
3021 self._set_non_blocking(w)
3022
3023 # To exercise all code paths in the C implementation we need
3024 # to play with buffer sizes. For instance, if we choose a
3025 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3026 # then we will never get a partial write of the buffer.
3027 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3028 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3029
3030 with rf, wf:
3031 for N in 9999, 73, 7574:
3032 try:
3033 i = 0
3034 while True:
3035 msg = bytes([i % 26 + 97]) * N
3036 sent.append(msg)
3037 wf.write(msg)
3038 i += 1
3039
3040 except self.BlockingIOError as e:
3041 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003042 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003043 sent[-1] = sent[-1][:e.characters_written]
3044 received.append(rf.read())
3045 msg = b'BLOCKED'
3046 wf.write(msg)
3047 sent.append(msg)
3048
3049 while True:
3050 try:
3051 wf.flush()
3052 break
3053 except self.BlockingIOError as e:
3054 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003055 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003056 self.assertEqual(e.characters_written, 0)
3057 received.append(rf.read())
3058
3059 received += iter(rf.read, None)
3060
3061 sent, received = b''.join(sent), b''.join(received)
3062 self.assertTrue(sent == received)
3063 self.assertTrue(wf.closed)
3064 self.assertTrue(rf.closed)
3065
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003066 def test_create_fail(self):
3067 # 'x' mode fails if file is existing
3068 with self.open(support.TESTFN, 'w'):
3069 pass
3070 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3071
3072 def test_create_writes(self):
3073 # 'x' mode opens for writing
3074 with self.open(support.TESTFN, 'xb') as f:
3075 f.write(b"spam")
3076 with self.open(support.TESTFN, 'rb') as f:
3077 self.assertEqual(b"spam", f.read())
3078
Christian Heimes7b648752012-09-10 14:48:43 +02003079 def test_open_allargs(self):
3080 # there used to be a buffer overflow in the parser for rawmode
3081 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3082
3083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084class CMiscIOTest(MiscIOTest):
3085 io = io
3086
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003087 def test_readinto_buffer_overflow(self):
3088 # Issue #18025
3089 class BadReader(self.io.BufferedIOBase):
3090 def read(self, n=-1):
3091 return b'x' * 10**6
3092 bufio = BadReader()
3093 b = bytearray(2)
3094 self.assertRaises(ValueError, bufio.readinto, b)
3095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003096class PyMiscIOTest(MiscIOTest):
3097 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003098
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003099
3100@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3101class SignalsTest(unittest.TestCase):
3102
3103 def setUp(self):
3104 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3105
3106 def tearDown(self):
3107 signal.signal(signal.SIGALRM, self.oldalrm)
3108
3109 def alarm_interrupt(self, sig, frame):
3110 1/0
3111
3112 @unittest.skipUnless(threading, 'Threading required for this test.')
3113 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3114 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003115 invokes the signal handler, and bubbles up the exception raised
3116 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003117 read_results = []
3118 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003119 if hasattr(signal, 'pthread_sigmask'):
3120 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003121 s = os.read(r, 1)
3122 read_results.append(s)
3123 t = threading.Thread(target=_read)
3124 t.daemon = True
3125 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003126 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003127 try:
3128 wio = self.io.open(w, **fdopen_kwargs)
3129 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003130 # Fill the pipe enough that the write will be blocking.
3131 # It will be interrupted by the timer armed above. Since the
3132 # other thread has read one byte, the low-level write will
3133 # return with a successful (partial) result rather than an EINTR.
3134 # The buffered IO layer must check for pending signal
3135 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003136 signal.alarm(1)
3137 try:
3138 self.assertRaises(ZeroDivisionError,
3139 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3140 finally:
3141 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003142 t.join()
3143 # We got one byte, get another one and check that it isn't a
3144 # repeat of the first one.
3145 read_results.append(os.read(r, 1))
3146 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3147 finally:
3148 os.close(w)
3149 os.close(r)
3150 # This is deliberate. If we didn't close the file descriptor
3151 # before closing wio, wio would try to flush its internal
3152 # buffer, and block again.
3153 try:
3154 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003155 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003156 if e.errno != errno.EBADF:
3157 raise
3158
3159 def test_interrupted_write_unbuffered(self):
3160 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3161
3162 def test_interrupted_write_buffered(self):
3163 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3164
3165 def test_interrupted_write_text(self):
3166 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3167
Brett Cannon31f59292011-02-21 19:29:56 +00003168 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003169 def check_reentrant_write(self, data, **fdopen_kwargs):
3170 def on_alarm(*args):
3171 # Will be called reentrantly from the same thread
3172 wio.write(data)
3173 1/0
3174 signal.signal(signal.SIGALRM, on_alarm)
3175 r, w = os.pipe()
3176 wio = self.io.open(w, **fdopen_kwargs)
3177 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003178 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003179 # Either the reentrant call to wio.write() fails with RuntimeError,
3180 # or the signal handler raises ZeroDivisionError.
3181 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3182 while 1:
3183 for i in range(100):
3184 wio.write(data)
3185 wio.flush()
3186 # Make sure the buffer doesn't fill up and block further writes
3187 os.read(r, len(data) * 100)
3188 exc = cm.exception
3189 if isinstance(exc, RuntimeError):
3190 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3191 finally:
3192 wio.close()
3193 os.close(r)
3194
3195 def test_reentrant_write_buffered(self):
3196 self.check_reentrant_write(b"xy", mode="wb")
3197
3198 def test_reentrant_write_text(self):
3199 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3200
Antoine Pitrou707ce822011-02-25 21:24:11 +00003201 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3202 """Check that a buffered read, when it gets interrupted (either
3203 returning a partial result or EINTR), properly invokes the signal
3204 handler and retries if the latter returned successfully."""
3205 r, w = os.pipe()
3206 fdopen_kwargs["closefd"] = False
3207 def alarm_handler(sig, frame):
3208 os.write(w, b"bar")
3209 signal.signal(signal.SIGALRM, alarm_handler)
3210 try:
3211 rio = self.io.open(r, **fdopen_kwargs)
3212 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003213 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003214 # Expected behaviour:
3215 # - first raw read() returns partial b"foo"
3216 # - second raw read() returns EINTR
3217 # - third raw read() returns b"bar"
3218 self.assertEqual(decode(rio.read(6)), "foobar")
3219 finally:
3220 rio.close()
3221 os.close(w)
3222 os.close(r)
3223
Antoine Pitrou20db5112011-08-19 20:32:34 +02003224 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003225 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3226 mode="rb")
3227
Antoine Pitrou20db5112011-08-19 20:32:34 +02003228 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003229 self.check_interrupted_read_retry(lambda x: x,
3230 mode="r")
3231
3232 @unittest.skipUnless(threading, 'Threading required for this test.')
3233 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3234 """Check that a buffered write, when it gets interrupted (either
3235 returning a partial result or EINTR), properly invokes the signal
3236 handler and retries if the latter returned successfully."""
3237 select = support.import_module("select")
3238 # A quantity that exceeds the buffer size of an anonymous pipe's
3239 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003240 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003241 r, w = os.pipe()
3242 fdopen_kwargs["closefd"] = False
3243 # We need a separate thread to read from the pipe and allow the
3244 # write() to finish. This thread is started after the SIGALRM is
3245 # received (forcing a first EINTR in write()).
3246 read_results = []
3247 write_finished = False
3248 def _read():
3249 while not write_finished:
3250 while r in select.select([r], [], [], 1.0)[0]:
3251 s = os.read(r, 1024)
3252 read_results.append(s)
3253 t = threading.Thread(target=_read)
3254 t.daemon = True
3255 def alarm1(sig, frame):
3256 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003257 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003258 def alarm2(sig, frame):
3259 t.start()
3260 signal.signal(signal.SIGALRM, alarm1)
3261 try:
3262 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003263 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003264 # Expected behaviour:
3265 # - first raw write() is partial (because of the limited pipe buffer
3266 # and the first alarm)
3267 # - second raw write() returns EINTR (because of the second alarm)
3268 # - subsequent write()s are successful (either partial or complete)
3269 self.assertEqual(N, wio.write(item * N))
3270 wio.flush()
3271 write_finished = True
3272 t.join()
3273 self.assertEqual(N, sum(len(x) for x in read_results))
3274 finally:
3275 write_finished = True
3276 os.close(w)
3277 os.close(r)
3278 # This is deliberate. If we didn't close the file descriptor
3279 # before closing wio, wio would try to flush its internal
3280 # buffer, and could block (in case of failure).
3281 try:
3282 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003283 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003284 if e.errno != errno.EBADF:
3285 raise
3286
Antoine Pitrou20db5112011-08-19 20:32:34 +02003287 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003288 self.check_interrupted_write_retry(b"x", mode="wb")
3289
Antoine Pitrou20db5112011-08-19 20:32:34 +02003290 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003291 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3292
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003293
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003294class CSignalsTest(SignalsTest):
3295 io = io
3296
3297class PySignalsTest(SignalsTest):
3298 io = pyio
3299
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003300 # Handling reentrancy issues would slow down _pyio even more, so the
3301 # tests are disabled.
3302 test_reentrant_write_buffered = None
3303 test_reentrant_write_text = None
3304
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003305
Ezio Melottidaa42c72013-03-23 16:30:16 +02003306def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003307 tests = (CIOTest, PyIOTest,
3308 CBufferedReaderTest, PyBufferedReaderTest,
3309 CBufferedWriterTest, PyBufferedWriterTest,
3310 CBufferedRWPairTest, PyBufferedRWPairTest,
3311 CBufferedRandomTest, PyBufferedRandomTest,
3312 StatefulIncrementalDecoderTest,
3313 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3314 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003315 CMiscIOTest, PyMiscIOTest,
3316 CSignalsTest, PySignalsTest,
3317 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003318
3319 # Put the namespaces of the IO module we are testing and some useful mock
3320 # classes in the __dict__ of each test.
3321 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003322 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003323 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3324 c_io_ns = {name : getattr(io, name) for name in all_members}
3325 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3326 globs = globals()
3327 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3328 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3329 # Avoid turning open into a bound method.
3330 py_io_ns["open"] = pyio.OpenWrapper
3331 for test in tests:
3332 if test.__name__.startswith("C"):
3333 for name, obj in c_io_ns.items():
3334 setattr(test, name, obj)
3335 elif test.__name__.startswith("Py"):
3336 for name, obj in py_io_ns.items():
3337 setattr(test, name, obj)
3338
Ezio Melottidaa42c72013-03-23 16:30:16 +02003339 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3340 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003341
3342if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003343 unittest.main()