blob: ac6d478e32fec0e239f505db02b7722e8f58b516 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600423 support.requires(
424 'largefile',
425 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000426 with self.open(support.TESTFN, "w+b", 0) as f:
427 self.large_file_ops(f)
428 with self.open(support.TESTFN, "w+b") as f:
429 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000430
431 def test_with_open(self):
432 for bufsize in (0, 1, 100):
433 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000434 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000435 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000436 self.assertEqual(f.closed, True)
437 f = None
438 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000440 1/0
441 except ZeroDivisionError:
442 self.assertEqual(f.closed, True)
443 else:
444 self.fail("1/0 didn't raise an exception")
445
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 # issue 5008
447 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000448 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000449 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000455 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def test_destructor(self):
458 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000460 def __del__(self):
461 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 try:
463 f = super().__del__
464 except AttributeError:
465 pass
466 else:
467 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def close(self):
469 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 def flush(self):
472 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000474 with support.check_warnings(('', ResourceWarning)):
475 f = MyFileIO(support.TESTFN, "wb")
476 f.write(b"xxx")
477 del f
478 support.gc_collect()
479 self.assertEqual(record, [1, 2, 3])
480 with self.open(support.TESTFN, "rb") as f:
481 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000482
483 def _check_base_destructor(self, base):
484 record = []
485 class MyIO(base):
486 def __init__(self):
487 # This exercises the availability of attributes on object
488 # destruction.
489 # (in the C version, close() is called by the tp_dealloc
490 # function, not by __del__)
491 self.on_del = 1
492 self.on_close = 2
493 self.on_flush = 3
494 def __del__(self):
495 record.append(self.on_del)
496 try:
497 f = super().__del__
498 except AttributeError:
499 pass
500 else:
501 f()
502 def close(self):
503 record.append(self.on_close)
504 super().close()
505 def flush(self):
506 record.append(self.on_flush)
507 super().flush()
508 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000509 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000510 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000511 self.assertEqual(record, [1, 2, 3])
512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000513 def test_IOBase_destructor(self):
514 self._check_base_destructor(self.IOBase)
515
516 def test_RawIOBase_destructor(self):
517 self._check_base_destructor(self.RawIOBase)
518
519 def test_BufferedIOBase_destructor(self):
520 self._check_base_destructor(self.BufferedIOBase)
521
522 def test_TextIOBase_destructor(self):
523 self._check_base_destructor(self.TextIOBase)
524
Guido van Rossum87429772007-04-10 21:06:59 +0000525 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000526 with self.open(support.TESTFN, "wb") as f:
527 f.write(b"xxx")
528 with self.open(support.TESTFN, "rb") as f:
529 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000530
Guido van Rossumd4103952007-04-12 05:44:49 +0000531 def test_array_writes(self):
532 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000533 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000534 with self.open(support.TESTFN, "wb", 0) as f:
535 self.assertEqual(f.write(a), n)
536 with self.open(support.TESTFN, "wb") as f:
537 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000538
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000539 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000541 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 def test_read_closed(self):
544 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 with self.open(support.TESTFN, "r") as f:
547 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000548 self.assertEqual(file.read(), "egg\n")
549 file.seek(0)
550 file.close()
551 self.assertRaises(ValueError, file.read)
552
553 def test_no_closefd_with_filename(self):
554 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000556
557 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000559 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000561 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 self.assertEqual(file.buffer.raw.closefd, False)
564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 def test_garbage_collection(self):
566 # FileIO objects are collected, and collecting them flushes
567 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000568 with support.check_warnings(('', ResourceWarning)):
569 f = self.FileIO(support.TESTFN, "wb")
570 f.write(b"abcxxx")
571 f.f = f
572 wr = weakref.ref(f)
573 del f
574 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000575 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000576 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000577 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000578
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000579 def test_unbounded_file(self):
580 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
581 zero = "/dev/zero"
582 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000583 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000585 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000588 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000590 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
594
Antoine Pitrou6be88762010-05-03 16:48:20 +0000595 def test_flush_error_on_close(self):
596 f = self.open(support.TESTFN, "wb", buffering=0)
597 def bad_flush():
598 raise IOError()
599 f.flush = bad_flush
600 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600601 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000602
603 def test_multi_close(self):
604 f = self.open(support.TESTFN, "wb", buffering=0)
605 f.close()
606 f.close()
607 f.close()
608 self.assertRaises(ValueError, f.flush)
609
Antoine Pitrou328ec742010-09-14 18:37:24 +0000610 def test_RawIOBase_read(self):
611 # Exercise the default RawIOBase.read() implementation (which calls
612 # readinto() internally).
613 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
614 self.assertEqual(rawio.read(2), b"ab")
615 self.assertEqual(rawio.read(2), b"c")
616 self.assertEqual(rawio.read(2), b"d")
617 self.assertEqual(rawio.read(2), None)
618 self.assertEqual(rawio.read(2), b"ef")
619 self.assertEqual(rawio.read(2), b"g")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"")
622
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400623 def test_types_have_dict(self):
624 test = (
625 self.IOBase(),
626 self.RawIOBase(),
627 self.TextIOBase(),
628 self.StringIO(),
629 self.BytesIO()
630 )
631 for obj in test:
632 self.assertTrue(hasattr(obj, "__dict__"))
633
Ross Lagerwall59142db2011-10-31 20:34:46 +0200634 def test_opener(self):
635 with self.open(support.TESTFN, "w") as f:
636 f.write("egg\n")
637 fd = os.open(support.TESTFN, os.O_RDONLY)
638 def opener(path, flags):
639 return fd
640 with self.open("non-existent", "r", opener=opener) as f:
641 self.assertEqual(f.read(), "egg\n")
642
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200643 def test_fileio_closefd(self):
644 # Issue #4841
645 with self.open(__file__, 'rb') as f1, \
646 self.open(__file__, 'rb') as f2:
647 fileio = self.FileIO(f1.fileno(), closefd=False)
648 # .__init__() must not close f1
649 fileio.__init__(f2.fileno(), closefd=False)
650 f1.readline()
651 # .close() must not close f2
652 fileio.close()
653 f2.readline()
654
655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200657
658 def test_IOBase_finalize(self):
659 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
660 # class which inherits IOBase and an object of this class are caught
661 # in a reference cycle and close() is already in the method cache.
662 class MyIO(self.IOBase):
663 def close(self):
664 pass
665
666 # create an instance to populate the method cache
667 MyIO()
668 obj = MyIO()
669 obj.obj = obj
670 wr = weakref.ref(obj)
671 del MyIO
672 del obj
673 support.gc_collect()
674 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676class PyIOTest(IOTest):
677 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000678
Guido van Rossuma9e20242007-03-08 00:43:48 +0000679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680class CommonBufferedTests:
681 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
682
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000683 def test_detach(self):
684 raw = self.MockRawIO()
685 buf = self.tp(raw)
686 self.assertIs(buf.detach(), raw)
687 self.assertRaises(ValueError, buf.detach)
688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 def test_fileno(self):
690 rawio = self.MockRawIO()
691 bufio = self.tp(rawio)
692
Ezio Melottib3aedd42010-11-20 19:04:17 +0000693 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694
Zachary Ware9fe6d862013-12-08 00:20:35 -0600695 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_no_fileno(self):
697 # XXX will we always have fileno() function? If so, kill
698 # this test. Else, write it.
699 pass
700
701 def test_invalid_args(self):
702 rawio = self.MockRawIO()
703 bufio = self.tp(rawio)
704 # Invalid whence
705 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200706 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707
708 def test_override_destructor(self):
709 tp = self.tp
710 record = []
711 class MyBufferedIO(tp):
712 def __del__(self):
713 record.append(1)
714 try:
715 f = super().__del__
716 except AttributeError:
717 pass
718 else:
719 f()
720 def close(self):
721 record.append(2)
722 super().close()
723 def flush(self):
724 record.append(3)
725 super().flush()
726 rawio = self.MockRawIO()
727 bufio = MyBufferedIO(rawio)
728 writable = bufio.writable()
729 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000730 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 if writable:
732 self.assertEqual(record, [1, 2, 3])
733 else:
734 self.assertEqual(record, [1, 2])
735
736 def test_context_manager(self):
737 # Test usability as a context manager
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740 def _with():
741 with bufio:
742 pass
743 _with()
744 # bufio should now be closed, and using it a second time should raise
745 # a ValueError.
746 self.assertRaises(ValueError, _with)
747
748 def test_error_through_destructor(self):
749 # Test that the exception state is not modified by a destructor,
750 # even if close() fails.
751 rawio = self.CloseFailureIO()
752 def f():
753 self.tp(rawio).xyzzy
754 with support.captured_output("stderr") as s:
755 self.assertRaises(AttributeError, f)
756 s = s.getvalue().strip()
757 if s:
758 # The destructor *may* have printed an unraisable error, check it
759 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000760 self.assertTrue(s.startswith("Exception IOError: "), s)
761 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000762
Antoine Pitrou716c4442009-05-23 19:04:03 +0000763 def test_repr(self):
764 raw = self.MockRawIO()
765 b = self.tp(raw)
766 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
767 self.assertEqual(repr(b), "<%s>" % clsname)
768 raw.name = "dummy"
769 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
770 raw.name = b"dummy"
771 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
772
Antoine Pitrou6be88762010-05-03 16:48:20 +0000773 def test_flush_error_on_close(self):
774 raw = self.MockRawIO()
775 def bad_flush():
776 raise IOError()
777 raw.flush = bad_flush
778 b = self.tp(raw)
779 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600780 self.assertTrue(b.closed)
781
782 def test_close_error_on_close(self):
783 raw = self.MockRawIO()
784 def bad_flush():
785 raise IOError('flush')
786 def bad_close():
787 raise IOError('close')
788 raw.close = bad_close
789 b = self.tp(raw)
790 b.flush = bad_flush
791 with self.assertRaises(IOError) as err: # exception not swallowed
792 b.close()
793 self.assertEqual(err.exception.args, ('close',))
794 self.assertEqual(err.exception.__context__.args, ('flush',))
795 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000796
797 def test_multi_close(self):
798 raw = self.MockRawIO()
799 b = self.tp(raw)
800 b.close()
801 b.close()
802 b.close()
803 self.assertRaises(ValueError, b.flush)
804
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000805 def test_unseekable(self):
806 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
807 self.assertRaises(self.UnsupportedOperation, bufio.tell)
808 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
809
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000810 def test_readonly_attributes(self):
811 raw = self.MockRawIO()
812 buf = self.tp(raw)
813 x = self.MockRawIO()
814 with self.assertRaises(AttributeError):
815 buf.raw = x
816
Guido van Rossum78892e42007-04-06 17:31:18 +0000817
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200818class SizeofTest:
819
820 @support.cpython_only
821 def test_sizeof(self):
822 bufsize1 = 4096
823 bufsize2 = 8192
824 rawio = self.MockRawIO()
825 bufio = self.tp(rawio, buffer_size=bufsize1)
826 size = sys.getsizeof(bufio) - bufsize1
827 rawio = self.MockRawIO()
828 bufio = self.tp(rawio, buffer_size=bufsize2)
829 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
830
831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
833 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835 def test_constructor(self):
836 rawio = self.MockRawIO([b"abc"])
837 bufio = self.tp(rawio)
838 bufio.__init__(rawio)
839 bufio.__init__(rawio, buffer_size=1024)
840 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000841 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
845 rawio = self.MockRawIO([b"abc"])
846 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000847 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000848
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200849 def test_uninitialized(self):
850 bufio = self.tp.__new__(self.tp)
851 del bufio
852 bufio = self.tp.__new__(self.tp)
853 self.assertRaisesRegex((ValueError, AttributeError),
854 'uninitialized|has no attribute',
855 bufio.read, 0)
856 bufio.__init__(self.MockRawIO())
857 self.assertEqual(bufio.read(0), b'')
858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000860 for arg in (None, 7):
861 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
862 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000863 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 # Invalid args
865 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 def test_read1(self):
868 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
869 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(b"a", bufio.read(1))
871 self.assertEqual(b"b", bufio.read1(1))
872 self.assertEqual(rawio._reads, 1)
873 self.assertEqual(b"c", bufio.read1(100))
874 self.assertEqual(rawio._reads, 1)
875 self.assertEqual(b"d", bufio.read1(100))
876 self.assertEqual(rawio._reads, 2)
877 self.assertEqual(b"efg", bufio.read1(100))
878 self.assertEqual(rawio._reads, 3)
879 self.assertEqual(b"", bufio.read1(100))
880 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 # Invalid args
882 self.assertRaises(ValueError, bufio.read1, -1)
883
884 def test_readinto(self):
885 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
886 bufio = self.tp(rawio)
887 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(bufio.readinto(b), 2)
889 self.assertEqual(b, b"ab")
890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"cd")
892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ef")
894 self.assertEqual(bufio.readinto(b), 1)
895 self.assertEqual(b, b"gf")
896 self.assertEqual(bufio.readinto(b), 0)
897 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200898 rawio = self.MockRawIO((b"abc", None))
899 bufio = self.tp(rawio)
900 self.assertEqual(bufio.readinto(b), 2)
901 self.assertEqual(b, b"ab")
902 self.assertEqual(bufio.readinto(b), 1)
903 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000905 def test_readlines(self):
906 def bufio():
907 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
908 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000909 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
910 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
911 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000914 data = b"abcdefghi"
915 dlen = len(data)
916
917 tests = [
918 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
919 [ 100, [ 3, 3, 3], [ dlen ] ],
920 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
921 ]
922
923 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 rawio = self.MockFileIO(data)
925 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000926 pos = 0
927 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000928 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000934 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
936 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"abcd", bufio.read(6))
938 self.assertEqual(b"e", bufio.read(1))
939 self.assertEqual(b"fg", bufio.read())
940 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200941 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000943
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 rawio = self.MockRawIO((b"a", None, None))
945 self.assertEqual(b"a", rawio.readall())
946 self.assertIsNone(rawio.readall())
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_read_past_eof(self):
949 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
950 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_read_all(self):
955 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
956 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000957
Ezio Melottib3aedd42010-11-20 19:04:17 +0000958 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000959
Victor Stinner45df8202010-04-28 22:31:17 +0000960 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000961 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000963 try:
964 # Write out many bytes with exactly the same number of 0's,
965 # 1's... 255's. This will help us check that concurrent reading
966 # doesn't duplicate or forget contents.
967 N = 1000
968 l = list(range(256)) * N
969 random.shuffle(l)
970 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000971 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000973 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 errors = []
976 results = []
977 def f():
978 try:
979 # Intra-buffer read then buffer-flushing read
980 for n in cycle([1, 19]):
981 s = bufio.read(n)
982 if not s:
983 break
984 # list.append() is atomic
985 results.append(s)
986 except Exception as e:
987 errors.append(e)
988 raise
989 threads = [threading.Thread(target=f) for x in range(20)]
990 for t in threads:
991 t.start()
992 time.sleep(0.02) # yield
993 for t in threads:
994 t.join()
995 self.assertFalse(errors,
996 "the following exceptions were caught: %r" % errors)
997 s = b''.join(results)
998 for i in range(256):
999 c = bytes(bytearray([i]))
1000 self.assertEqual(s.count(c), N)
1001 finally:
1002 support.unlink(support.TESTFN)
1003
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001004 def test_unseekable(self):
1005 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1006 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1007 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1008 bufio.read(1)
1009 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1010 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1011
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 def test_misbehaved_io(self):
1013 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1014 bufio = self.tp(rawio)
1015 self.assertRaises(IOError, bufio.seek, 0)
1016 self.assertRaises(IOError, bufio.tell)
1017
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001018 def test_no_extraneous_read(self):
1019 # Issue #9550; when the raw IO object has satisfied the read request,
1020 # we should not issue any additional reads, otherwise it may block
1021 # (e.g. socket).
1022 bufsize = 16
1023 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1024 rawio = self.MockRawIO([b"x" * n])
1025 bufio = self.tp(rawio, bufsize)
1026 self.assertEqual(bufio.read(n), b"x" * n)
1027 # Simple case: one raw read is enough to satisfy the request.
1028 self.assertEqual(rawio._extraneous_reads, 0,
1029 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1030 # A more complex case where two raw reads are needed to satisfy
1031 # the request.
1032 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1033 bufio = self.tp(rawio, bufsize)
1034 self.assertEqual(bufio.read(n), b"x" * n)
1035 self.assertEqual(rawio._extraneous_reads, 0,
1036 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1037
1038
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001039class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 tp = io.BufferedReader
1041
1042 def test_constructor(self):
1043 BufferedReaderTest.test_constructor(self)
1044 # The allocation can succeed on 32-bit builds, e.g. with more
1045 # than 2GB RAM and a 64-bit kernel.
1046 if sys.maxsize > 0x7FFFFFFF:
1047 rawio = self.MockRawIO()
1048 bufio = self.tp(rawio)
1049 self.assertRaises((OverflowError, MemoryError, ValueError),
1050 bufio.__init__, rawio, sys.maxsize)
1051
1052 def test_initialization(self):
1053 rawio = self.MockRawIO([b"abc"])
1054 bufio = self.tp(rawio)
1055 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1056 self.assertRaises(ValueError, bufio.read)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1058 self.assertRaises(ValueError, bufio.read)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1060 self.assertRaises(ValueError, bufio.read)
1061
1062 def test_misbehaved_io_read(self):
1063 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1064 bufio = self.tp(rawio)
1065 # _pyio.BufferedReader seems to implement reading different, so that
1066 # checking this is not so easy.
1067 self.assertRaises(IOError, bufio.read, 10)
1068
1069 def test_garbage_collection(self):
1070 # C BufferedReader objects are collected.
1071 # The Python version has __del__, so it ends into gc.garbage instead
1072 rawio = self.FileIO(support.TESTFN, "w+b")
1073 f = self.tp(rawio)
1074 f.f = f
1075 wr = weakref.ref(f)
1076 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001077 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001078 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079
R David Murray67bfe802013-02-23 21:51:05 -05001080 def test_args_error(self):
1081 # Issue #17275
1082 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1083 self.tp(io.BytesIO(), 1024, 1024, 1024)
1084
1085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086class PyBufferedReaderTest(BufferedReaderTest):
1087 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001088
Guido van Rossuma9e20242007-03-08 00:43:48 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1091 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 def test_constructor(self):
1094 rawio = self.MockRawIO()
1095 bufio = self.tp(rawio)
1096 bufio.__init__(rawio)
1097 bufio.__init__(rawio, buffer_size=1024)
1098 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001099 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100 bufio.flush()
1101 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1102 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1103 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1104 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001109 def test_uninitialized(self):
1110 bufio = self.tp.__new__(self.tp)
1111 del bufio
1112 bufio = self.tp.__new__(self.tp)
1113 self.assertRaisesRegex((ValueError, AttributeError),
1114 'uninitialized|has no attribute',
1115 bufio.write, b'')
1116 bufio.__init__(self.MockRawIO())
1117 self.assertEqual(bufio.write(b''), 0)
1118
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001119 def test_detach_flush(self):
1120 raw = self.MockRawIO()
1121 buf = self.tp(raw)
1122 buf.write(b"howdy!")
1123 self.assertFalse(raw._write_stack)
1124 buf.detach()
1125 self.assertEqual(raw._write_stack, [b"howdy!"])
1126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001128 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 writer = self.MockRawIO()
1130 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001131 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001132 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 def test_write_overflow(self):
1135 writer = self.MockRawIO()
1136 bufio = self.tp(writer, 8)
1137 contents = b"abcdefghijklmnop"
1138 for n in range(0, len(contents), 3):
1139 bufio.write(contents[n:n+3])
1140 flushed = b"".join(writer._write_stack)
1141 # At least (total - 8) bytes were implicitly flushed, perhaps more
1142 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001143 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001144
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001145 def check_writes(self, intermediate_func):
1146 # Lots of writes, test the flushed output is as expected.
1147 contents = bytes(range(256)) * 1000
1148 n = 0
1149 writer = self.MockRawIO()
1150 bufio = self.tp(writer, 13)
1151 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1152 def gen_sizes():
1153 for size in count(1):
1154 for i in range(15):
1155 yield size
1156 sizes = gen_sizes()
1157 while n < len(contents):
1158 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001159 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 intermediate_func(bufio)
1161 n += size
1162 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001163 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001165 def test_writes(self):
1166 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 def test_writes_and_flushes(self):
1169 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 def test_writes_and_seeks(self):
1172 def _seekabs(bufio):
1173 pos = bufio.tell()
1174 bufio.seek(pos + 1, 0)
1175 bufio.seek(pos - 1, 0)
1176 bufio.seek(pos, 0)
1177 self.check_writes(_seekabs)
1178 def _seekrel(bufio):
1179 pos = bufio.seek(0, 1)
1180 bufio.seek(+1, 1)
1181 bufio.seek(-1, 1)
1182 bufio.seek(pos, 0)
1183 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 def test_writes_and_truncates(self):
1186 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 def test_write_non_blocking(self):
1189 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001190 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001191
Ezio Melottib3aedd42010-11-20 19:04:17 +00001192 self.assertEqual(bufio.write(b"abcd"), 4)
1193 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001194 # 1 byte will be written, the rest will be buffered
1195 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001196 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1199 raw.block_on(b"0")
1200 try:
1201 bufio.write(b"opqrwxyz0123456789")
1202 except self.BlockingIOError as e:
1203 written = e.characters_written
1204 else:
1205 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(written, 16)
1207 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001209
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211 s = raw.pop_written()
1212 # Previously buffered bytes were flushed
1213 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_write_and_rewind(self):
1216 raw = io.BytesIO()
1217 bufio = self.tp(raw, 4)
1218 self.assertEqual(bufio.write(b"abcdef"), 6)
1219 self.assertEqual(bufio.tell(), 6)
1220 bufio.seek(0, 0)
1221 self.assertEqual(bufio.write(b"XY"), 2)
1222 bufio.seek(6, 0)
1223 self.assertEqual(raw.getvalue(), b"XYcdef")
1224 self.assertEqual(bufio.write(b"123456"), 6)
1225 bufio.flush()
1226 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228 def test_flush(self):
1229 writer = self.MockRawIO()
1230 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001231 bufio.write(b"abc")
1232 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001233 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001234
Antoine Pitrou131a4892012-10-16 22:57:11 +02001235 def test_writelines(self):
1236 l = [b'ab', b'cd', b'ef']
1237 writer = self.MockRawIO()
1238 bufio = self.tp(writer, 8)
1239 bufio.writelines(l)
1240 bufio.flush()
1241 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1242
1243 def test_writelines_userlist(self):
1244 l = UserList([b'ab', b'cd', b'ef'])
1245 writer = self.MockRawIO()
1246 bufio = self.tp(writer, 8)
1247 bufio.writelines(l)
1248 bufio.flush()
1249 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1250
1251 def test_writelines_error(self):
1252 writer = self.MockRawIO()
1253 bufio = self.tp(writer, 8)
1254 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1255 self.assertRaises(TypeError, bufio.writelines, None)
1256 self.assertRaises(TypeError, bufio.writelines, 'abc')
1257
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258 def test_destructor(self):
1259 writer = self.MockRawIO()
1260 bufio = self.tp(writer, 8)
1261 bufio.write(b"abc")
1262 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001263 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265
1266 def test_truncate(self):
1267 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001268 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 bufio = self.tp(raw, 8)
1270 bufio.write(b"abcdef")
1271 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001272 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001273 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 self.assertEqual(f.read(), b"abc")
1275
Victor Stinner45df8202010-04-28 22:31:17 +00001276 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001277 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001279 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001280 # Write out many bytes from many threads and test they were
1281 # all flushed.
1282 N = 1000
1283 contents = bytes(range(256)) * N
1284 sizes = cycle([1, 19])
1285 n = 0
1286 queue = deque()
1287 while n < len(contents):
1288 size = next(sizes)
1289 queue.append(contents[n:n+size])
1290 n += size
1291 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001292 # We use a real file object because it allows us to
1293 # exercise situations where the GIL is released before
1294 # writing the buffer to the raw streams. This is in addition
1295 # to concurrency issues due to switching threads in the middle
1296 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001297 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001299 errors = []
1300 def f():
1301 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 while True:
1303 try:
1304 s = queue.popleft()
1305 except IndexError:
1306 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001307 bufio.write(s)
1308 except Exception as e:
1309 errors.append(e)
1310 raise
1311 threads = [threading.Thread(target=f) for x in range(20)]
1312 for t in threads:
1313 t.start()
1314 time.sleep(0.02) # yield
1315 for t in threads:
1316 t.join()
1317 self.assertFalse(errors,
1318 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001319 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001320 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001321 s = f.read()
1322 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001323 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001324 finally:
1325 support.unlink(support.TESTFN)
1326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001327 def test_misbehaved_io(self):
1328 rawio = self.MisbehavedRawIO()
1329 bufio = self.tp(rawio, 5)
1330 self.assertRaises(IOError, bufio.seek, 0)
1331 self.assertRaises(IOError, bufio.tell)
1332 self.assertRaises(IOError, bufio.write, b"abcdef")
1333
Florent Xicluna109d5732012-07-07 17:03:22 +02001334 def test_max_buffer_size_removal(self):
1335 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001336 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001337
Benjamin Peterson68623612012-12-20 11:53:11 -06001338 def test_write_error_on_close(self):
1339 raw = self.MockRawIO()
1340 def bad_write(b):
1341 raise IOError()
1342 raw.write = bad_write
1343 b = self.tp(raw)
1344 b.write(b'spam')
1345 self.assertRaises(IOError, b.close) # exception not swallowed
1346 self.assertTrue(b.closed)
1347
Benjamin Peterson59406a92009-03-26 17:10:29 +00001348
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001349class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 tp = io.BufferedWriter
1351
1352 def test_constructor(self):
1353 BufferedWriterTest.test_constructor(self)
1354 # The allocation can succeed on 32-bit builds, e.g. with more
1355 # than 2GB RAM and a 64-bit kernel.
1356 if sys.maxsize > 0x7FFFFFFF:
1357 rawio = self.MockRawIO()
1358 bufio = self.tp(rawio)
1359 self.assertRaises((OverflowError, MemoryError, ValueError),
1360 bufio.__init__, rawio, sys.maxsize)
1361
1362 def test_initialization(self):
1363 rawio = self.MockRawIO()
1364 bufio = self.tp(rawio)
1365 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1366 self.assertRaises(ValueError, bufio.write, b"def")
1367 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1368 self.assertRaises(ValueError, bufio.write, b"def")
1369 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1370 self.assertRaises(ValueError, bufio.write, b"def")
1371
1372 def test_garbage_collection(self):
1373 # C BufferedWriter objects are collected, and collecting them flushes
1374 # all data to disk.
1375 # The Python version has __del__, so it ends into gc.garbage instead
1376 rawio = self.FileIO(support.TESTFN, "w+b")
1377 f = self.tp(rawio)
1378 f.write(b"123xxx")
1379 f.x = f
1380 wr = weakref.ref(f)
1381 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001382 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001383 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001384 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 self.assertEqual(f.read(), b"123xxx")
1386
R David Murray67bfe802013-02-23 21:51:05 -05001387 def test_args_error(self):
1388 # Issue #17275
1389 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1390 self.tp(io.BytesIO(), 1024, 1024, 1024)
1391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392
1393class PyBufferedWriterTest(BufferedWriterTest):
1394 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001395
Guido van Rossum01a27522007-03-07 01:00:12 +00001396class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001397
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001398 def test_constructor(self):
1399 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001400 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001401
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001402 def test_uninitialized(self):
1403 pair = self.tp.__new__(self.tp)
1404 del pair
1405 pair = self.tp.__new__(self.tp)
1406 self.assertRaisesRegex((ValueError, AttributeError),
1407 'uninitialized|has no attribute',
1408 pair.read, 0)
1409 self.assertRaisesRegex((ValueError, AttributeError),
1410 'uninitialized|has no attribute',
1411 pair.write, b'')
1412 pair.__init__(self.MockRawIO(), self.MockRawIO())
1413 self.assertEqual(pair.read(0), b'')
1414 self.assertEqual(pair.write(b''), 0)
1415
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001416 def test_detach(self):
1417 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1418 self.assertRaises(self.UnsupportedOperation, pair.detach)
1419
Florent Xicluna109d5732012-07-07 17:03:22 +02001420 def test_constructor_max_buffer_size_removal(self):
1421 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001422 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001423
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001424 def test_constructor_with_not_readable(self):
1425 class NotReadable(MockRawIO):
1426 def readable(self):
1427 return False
1428
1429 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1430
1431 def test_constructor_with_not_writeable(self):
1432 class NotWriteable(MockRawIO):
1433 def writable(self):
1434 return False
1435
1436 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1437
1438 def test_read(self):
1439 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1440
1441 self.assertEqual(pair.read(3), b"abc")
1442 self.assertEqual(pair.read(1), b"d")
1443 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001444 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1445 self.assertEqual(pair.read(None), b"abc")
1446
1447 def test_readlines(self):
1448 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1449 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1450 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1451 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001452
1453 def test_read1(self):
1454 # .read1() is delegated to the underlying reader object, so this test
1455 # can be shallow.
1456 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1457
1458 self.assertEqual(pair.read1(3), b"abc")
1459
1460 def test_readinto(self):
1461 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1462
1463 data = bytearray(5)
1464 self.assertEqual(pair.readinto(data), 5)
1465 self.assertEqual(data, b"abcde")
1466
1467 def test_write(self):
1468 w = self.MockRawIO()
1469 pair = self.tp(self.MockRawIO(), w)
1470
1471 pair.write(b"abc")
1472 pair.flush()
1473 pair.write(b"def")
1474 pair.flush()
1475 self.assertEqual(w._write_stack, [b"abc", b"def"])
1476
1477 def test_peek(self):
1478 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1479
1480 self.assertTrue(pair.peek(3).startswith(b"abc"))
1481 self.assertEqual(pair.read(3), b"abc")
1482
1483 def test_readable(self):
1484 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1485 self.assertTrue(pair.readable())
1486
1487 def test_writeable(self):
1488 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1489 self.assertTrue(pair.writable())
1490
1491 def test_seekable(self):
1492 # BufferedRWPairs are never seekable, even if their readers and writers
1493 # are.
1494 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1495 self.assertFalse(pair.seekable())
1496
1497 # .flush() is delegated to the underlying writer object and has been
1498 # tested in the test_write method.
1499
1500 def test_close_and_closed(self):
1501 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1502 self.assertFalse(pair.closed)
1503 pair.close()
1504 self.assertTrue(pair.closed)
1505
1506 def test_isatty(self):
1507 class SelectableIsAtty(MockRawIO):
1508 def __init__(self, isatty):
1509 MockRawIO.__init__(self)
1510 self._isatty = isatty
1511
1512 def isatty(self):
1513 return self._isatty
1514
1515 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1516 self.assertFalse(pair.isatty())
1517
1518 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1519 self.assertTrue(pair.isatty())
1520
1521 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1522 self.assertTrue(pair.isatty())
1523
1524 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1525 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527class CBufferedRWPairTest(BufferedRWPairTest):
1528 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530class PyBufferedRWPairTest(BufferedRWPairTest):
1531 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533
1534class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1535 read_mode = "rb+"
1536 write_mode = "wb+"
1537
1538 def test_constructor(self):
1539 BufferedReaderTest.test_constructor(self)
1540 BufferedWriterTest.test_constructor(self)
1541
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001542 def test_uninitialized(self):
1543 BufferedReaderTest.test_uninitialized(self)
1544 BufferedWriterTest.test_uninitialized(self)
1545
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 def test_read_and_write(self):
1547 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001548 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001549
1550 self.assertEqual(b"as", rw.read(2))
1551 rw.write(b"ddd")
1552 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001553 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001555 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557 def test_seek_and_tell(self):
1558 raw = self.BytesIO(b"asdfghjkl")
1559 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001560
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(b"as", rw.read(2))
1562 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001563 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001564 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001565
Antoine Pitroue05565e2011-08-20 14:39:23 +02001566 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001567 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001568 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001569 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001570 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001571 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001572 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001573 self.assertEqual(7, rw.tell())
1574 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001575 rw.flush()
1576 self.assertEqual(b"asdf123fl", raw.getvalue())
1577
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001578 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 def check_flush_and_read(self, read_func):
1581 raw = self.BytesIO(b"abcdefghi")
1582 bufio = self.tp(raw)
1583
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001586 self.assertEqual(b"ef", read_func(bufio, 2))
1587 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001589 self.assertEqual(6, bufio.tell())
1590 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 raw.seek(0, 0)
1592 raw.write(b"XYZ")
1593 # flush() resets the read buffer
1594 bufio.flush()
1595 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001596 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597
1598 def test_flush_and_read(self):
1599 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1600
1601 def test_flush_and_readinto(self):
1602 def _readinto(bufio, n=-1):
1603 b = bytearray(n if n >= 0 else 9999)
1604 n = bufio.readinto(b)
1605 return bytes(b[:n])
1606 self.check_flush_and_read(_readinto)
1607
1608 def test_flush_and_peek(self):
1609 def _peek(bufio, n=-1):
1610 # This relies on the fact that the buffer can contain the whole
1611 # raw stream, otherwise peek() can return less.
1612 b = bufio.peek(n)
1613 if n != -1:
1614 b = b[:n]
1615 bufio.seek(len(b), 1)
1616 return b
1617 self.check_flush_and_read(_peek)
1618
1619 def test_flush_and_write(self):
1620 raw = self.BytesIO(b"abcdefghi")
1621 bufio = self.tp(raw)
1622
1623 bufio.write(b"123")
1624 bufio.flush()
1625 bufio.write(b"45")
1626 bufio.flush()
1627 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001628 self.assertEqual(b"12345fghi", raw.getvalue())
1629 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630
1631 def test_threads(self):
1632 BufferedReaderTest.test_threads(self)
1633 BufferedWriterTest.test_threads(self)
1634
1635 def test_writes_and_peek(self):
1636 def _peek(bufio):
1637 bufio.peek(1)
1638 self.check_writes(_peek)
1639 def _peek(bufio):
1640 pos = bufio.tell()
1641 bufio.seek(-1, 1)
1642 bufio.peek(1)
1643 bufio.seek(pos, 0)
1644 self.check_writes(_peek)
1645
1646 def test_writes_and_reads(self):
1647 def _read(bufio):
1648 bufio.seek(-1, 1)
1649 bufio.read(1)
1650 self.check_writes(_read)
1651
1652 def test_writes_and_read1s(self):
1653 def _read1(bufio):
1654 bufio.seek(-1, 1)
1655 bufio.read1(1)
1656 self.check_writes(_read1)
1657
1658 def test_writes_and_readintos(self):
1659 def _read(bufio):
1660 bufio.seek(-1, 1)
1661 bufio.readinto(bytearray(1))
1662 self.check_writes(_read)
1663
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001664 def test_write_after_readahead(self):
1665 # Issue #6629: writing after the buffer was filled by readahead should
1666 # first rewind the raw stream.
1667 for overwrite_size in [1, 5]:
1668 raw = self.BytesIO(b"A" * 10)
1669 bufio = self.tp(raw, 4)
1670 # Trigger readahead
1671 self.assertEqual(bufio.read(1), b"A")
1672 self.assertEqual(bufio.tell(), 1)
1673 # Overwriting should rewind the raw stream if it needs so
1674 bufio.write(b"B" * overwrite_size)
1675 self.assertEqual(bufio.tell(), overwrite_size + 1)
1676 # If the write size was smaller than the buffer size, flush() and
1677 # check that rewind happens.
1678 bufio.flush()
1679 self.assertEqual(bufio.tell(), overwrite_size + 1)
1680 s = raw.getvalue()
1681 self.assertEqual(s,
1682 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1683
Antoine Pitrou7c404892011-05-13 00:13:33 +02001684 def test_write_rewind_write(self):
1685 # Various combinations of reading / writing / seeking backwards / writing again
1686 def mutate(bufio, pos1, pos2):
1687 assert pos2 >= pos1
1688 # Fill the buffer
1689 bufio.seek(pos1)
1690 bufio.read(pos2 - pos1)
1691 bufio.write(b'\x02')
1692 # This writes earlier than the previous write, but still inside
1693 # the buffer.
1694 bufio.seek(pos1)
1695 bufio.write(b'\x01')
1696
1697 b = b"\x80\x81\x82\x83\x84"
1698 for i in range(0, len(b)):
1699 for j in range(i, len(b)):
1700 raw = self.BytesIO(b)
1701 bufio = self.tp(raw, 100)
1702 mutate(bufio, i, j)
1703 bufio.flush()
1704 expected = bytearray(b)
1705 expected[j] = 2
1706 expected[i] = 1
1707 self.assertEqual(raw.getvalue(), expected,
1708 "failed result for i=%d, j=%d" % (i, j))
1709
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001710 def test_truncate_after_read_or_write(self):
1711 raw = self.BytesIO(b"A" * 10)
1712 bufio = self.tp(raw, 100)
1713 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1714 self.assertEqual(bufio.truncate(), 2)
1715 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1716 self.assertEqual(bufio.truncate(), 4)
1717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 def test_misbehaved_io(self):
1719 BufferedReaderTest.test_misbehaved_io(self)
1720 BufferedWriterTest.test_misbehaved_io(self)
1721
Antoine Pitroue05565e2011-08-20 14:39:23 +02001722 def test_interleaved_read_write(self):
1723 # Test for issue #12213
1724 with self.BytesIO(b'abcdefgh') as raw:
1725 with self.tp(raw, 100) as f:
1726 f.write(b"1")
1727 self.assertEqual(f.read(1), b'b')
1728 f.write(b'2')
1729 self.assertEqual(f.read1(1), b'd')
1730 f.write(b'3')
1731 buf = bytearray(1)
1732 f.readinto(buf)
1733 self.assertEqual(buf, b'f')
1734 f.write(b'4')
1735 self.assertEqual(f.peek(1), b'h')
1736 f.flush()
1737 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1738
1739 with self.BytesIO(b'abc') as raw:
1740 with self.tp(raw, 100) as f:
1741 self.assertEqual(f.read(1), b'a')
1742 f.write(b"2")
1743 self.assertEqual(f.read(1), b'c')
1744 f.flush()
1745 self.assertEqual(raw.getvalue(), b'a2c')
1746
1747 def test_interleaved_readline_write(self):
1748 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1749 with self.tp(raw) as f:
1750 f.write(b'1')
1751 self.assertEqual(f.readline(), b'b\n')
1752 f.write(b'2')
1753 self.assertEqual(f.readline(), b'def\n')
1754 f.write(b'3')
1755 self.assertEqual(f.readline(), b'\n')
1756 f.flush()
1757 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1758
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001759 # You can't construct a BufferedRandom over a non-seekable stream.
1760 test_unseekable = None
1761
R David Murray67bfe802013-02-23 21:51:05 -05001762
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001763class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 tp = io.BufferedRandom
1765
1766 def test_constructor(self):
1767 BufferedRandomTest.test_constructor(self)
1768 # The allocation can succeed on 32-bit builds, e.g. with more
1769 # than 2GB RAM and a 64-bit kernel.
1770 if sys.maxsize > 0x7FFFFFFF:
1771 rawio = self.MockRawIO()
1772 bufio = self.tp(rawio)
1773 self.assertRaises((OverflowError, MemoryError, ValueError),
1774 bufio.__init__, rawio, sys.maxsize)
1775
1776 def test_garbage_collection(self):
1777 CBufferedReaderTest.test_garbage_collection(self)
1778 CBufferedWriterTest.test_garbage_collection(self)
1779
R David Murray67bfe802013-02-23 21:51:05 -05001780 def test_args_error(self):
1781 # Issue #17275
1782 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1783 self.tp(io.BytesIO(), 1024, 1024, 1024)
1784
1785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786class PyBufferedRandomTest(BufferedRandomTest):
1787 tp = pyio.BufferedRandom
1788
1789
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001790# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1791# properties:
1792# - A single output character can correspond to many bytes of input.
1793# - The number of input bytes to complete the character can be
1794# undetermined until the last input byte is received.
1795# - The number of input bytes can vary depending on previous input.
1796# - A single input byte can correspond to many characters of output.
1797# - The number of output characters can be undetermined until the
1798# last input byte is received.
1799# - The number of output characters can vary depending on previous input.
1800
1801class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1802 """
1803 For testing seek/tell behavior with a stateful, buffering decoder.
1804
1805 Input is a sequence of words. Words may be fixed-length (length set
1806 by input) or variable-length (period-terminated). In variable-length
1807 mode, extra periods are ignored. Possible words are:
1808 - 'i' followed by a number sets the input length, I (maximum 99).
1809 When I is set to 0, words are space-terminated.
1810 - 'o' followed by a number sets the output length, O (maximum 99).
1811 - Any other word is converted into a word followed by a period on
1812 the output. The output word consists of the input word truncated
1813 or padded out with hyphens to make its length equal to O. If O
1814 is 0, the word is output verbatim without truncating or padding.
1815 I and O are initially set to 1. When I changes, any buffered input is
1816 re-scanned according to the new I. EOF also terminates the last word.
1817 """
1818
1819 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001820 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001821 self.reset()
1822
1823 def __repr__(self):
1824 return '<SID %x>' % id(self)
1825
1826 def reset(self):
1827 self.i = 1
1828 self.o = 1
1829 self.buffer = bytearray()
1830
1831 def getstate(self):
1832 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1833 return bytes(self.buffer), i*100 + o
1834
1835 def setstate(self, state):
1836 buffer, io = state
1837 self.buffer = bytearray(buffer)
1838 i, o = divmod(io, 100)
1839 self.i, self.o = i ^ 1, o ^ 1
1840
1841 def decode(self, input, final=False):
1842 output = ''
1843 for b in input:
1844 if self.i == 0: # variable-length, terminated with period
1845 if b == ord('.'):
1846 if self.buffer:
1847 output += self.process_word()
1848 else:
1849 self.buffer.append(b)
1850 else: # fixed-length, terminate after self.i bytes
1851 self.buffer.append(b)
1852 if len(self.buffer) == self.i:
1853 output += self.process_word()
1854 if final and self.buffer: # EOF terminates the last word
1855 output += self.process_word()
1856 return output
1857
1858 def process_word(self):
1859 output = ''
1860 if self.buffer[0] == ord('i'):
1861 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1862 elif self.buffer[0] == ord('o'):
1863 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1864 else:
1865 output = self.buffer.decode('ascii')
1866 if len(output) < self.o:
1867 output += '-'*self.o # pad out with hyphens
1868 if self.o:
1869 output = output[:self.o] # truncate to output length
1870 output += '.'
1871 self.buffer = bytearray()
1872 return output
1873
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001874 codecEnabled = False
1875
1876 @classmethod
1877 def lookupTestDecoder(cls, name):
1878 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001879 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001880 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001881 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001882 incrementalencoder=None,
1883 streamreader=None, streamwriter=None,
1884 incrementaldecoder=cls)
1885
1886# Register the previous decoder for testing.
1887# Disabled by default, tests will enable it.
1888codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1889
1890
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001891class StatefulIncrementalDecoderTest(unittest.TestCase):
1892 """
1893 Make sure the StatefulIncrementalDecoder actually works.
1894 """
1895
1896 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001897 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001899 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001900 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001901 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001903 # I=0, O=6 (variable-length input, fixed-length output)
1904 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1905 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001906 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001907 # I=6, O=3 (fixed-length input > fixed-length output)
1908 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1909 # I=0, then 3; O=29, then 15 (with longer output)
1910 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1911 'a----------------------------.' +
1912 'b----------------------------.' +
1913 'cde--------------------------.' +
1914 'abcdefghijabcde.' +
1915 'a.b------------.' +
1916 '.c.------------.' +
1917 'd.e------------.' +
1918 'k--------------.' +
1919 'l--------------.' +
1920 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001921 ]
1922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001924 # Try a few one-shot test cases.
1925 for input, eof, output in self.test_cases:
1926 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001928
1929 # Also test an unfinished decode, followed by forcing EOF.
1930 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001931 self.assertEqual(d.decode(b'oiabcd'), '')
1932 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001933
1934class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001935
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001936 def setUp(self):
1937 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1938 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001939 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001940
Guido van Rossumd0712812007-04-11 16:32:43 +00001941 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001942 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 def test_constructor(self):
1945 r = self.BytesIO(b"\xc3\xa9\n\n")
1946 b = self.BufferedReader(r, 1000)
1947 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001948 t.__init__(b, encoding="latin-1", newline="\r\n")
1949 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001951 t.__init__(b, encoding="utf-8", line_buffering=True)
1952 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001953 self.assertEqual(t.line_buffering, True)
1954 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 self.assertRaises(TypeError, t.__init__, b, newline=42)
1956 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1957
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001958 def test_detach(self):
1959 r = self.BytesIO()
1960 b = self.BufferedWriter(r)
1961 t = self.TextIOWrapper(b)
1962 self.assertIs(t.detach(), b)
1963
1964 t = self.TextIOWrapper(b, encoding="ascii")
1965 t.write("howdy")
1966 self.assertFalse(r.getvalue())
1967 t.detach()
1968 self.assertEqual(r.getvalue(), b"howdy")
1969 self.assertRaises(ValueError, t.detach)
1970
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001971 def test_repr(self):
1972 raw = self.BytesIO("hello".encode("utf-8"))
1973 b = self.BufferedReader(raw)
1974 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001975 modname = self.TextIOWrapper.__module__
1976 self.assertEqual(repr(t),
1977 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1978 raw.name = "dummy"
1979 self.assertEqual(repr(t),
1980 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001981 t.mode = "r"
1982 self.assertEqual(repr(t),
1983 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001984 raw.name = b"dummy"
1985 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001986 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 def test_line_buffering(self):
1989 r = self.BytesIO()
1990 b = self.BufferedWriter(r, 1000)
1991 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001992 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001994 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001996 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001997 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001998
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001999 def test_default_encoding(self):
2000 old_environ = dict(os.environ)
2001 try:
2002 # try to get a user preferred encoding different than the current
2003 # locale encoding to check that TextIOWrapper() uses the current
2004 # locale encoding and not the user preferred encoding
2005 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2006 if key in os.environ:
2007 del os.environ[key]
2008
2009 current_locale_encoding = locale.getpreferredencoding(False)
2010 b = self.BytesIO()
2011 t = self.TextIOWrapper(b)
2012 self.assertEqual(t.encoding, current_locale_encoding)
2013 finally:
2014 os.environ.clear()
2015 os.environ.update(old_environ)
2016
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002017 @support.cpython_only
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02002018 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002019 # Issue 15989
2020 import _testcapi
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02002021 b = self.BytesIO()
2022 b.fileno = lambda: _testcapi.INT_MAX + 1
2023 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2024 b.fileno = lambda: _testcapi.UINT_MAX + 1
2025 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def test_encoding(self):
2028 # Check the encoding attribute is always set, and valid
2029 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002030 t = self.TextIOWrapper(b, encoding="utf-8")
2031 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002033 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002034 codecs.lookup(t.encoding)
2035
2036 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002037 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 b = self.BytesIO(b"abc\n\xff\n")
2039 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002040 self.assertRaises(UnicodeError, t.read)
2041 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 b = self.BytesIO(b"abc\n\xff\n")
2043 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002044 self.assertRaises(UnicodeError, t.read)
2045 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 b = self.BytesIO(b"abc\n\xff\n")
2047 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002048 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002049 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 b = self.BytesIO(b"abc\n\xff\n")
2051 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002052 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002055 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002056 b = self.BytesIO()
2057 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002058 self.assertRaises(UnicodeError, t.write, "\xff")
2059 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002060 b = self.BytesIO()
2061 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002062 self.assertRaises(UnicodeError, t.write, "\xff")
2063 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 b = self.BytesIO()
2065 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002066 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002067 t.write("abc\xffdef\n")
2068 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002069 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002070 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 b = self.BytesIO()
2072 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002073 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002074 t.write("abc\xffdef\n")
2075 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002076 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002078 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002079 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2080
2081 tests = [
2082 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002083 [ '', input_lines ],
2084 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2085 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2086 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002087 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002088 encodings = (
2089 'utf-8', 'latin-1',
2090 'utf-16', 'utf-16-le', 'utf-16-be',
2091 'utf-32', 'utf-32-le', 'utf-32-be',
2092 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002093
Guido van Rossum8358db22007-08-18 21:39:55 +00002094 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002095 # character in TextIOWrapper._pending_line.
2096 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002097 # XXX: str.encode() should return bytes
2098 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002099 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002100 for bufsize in range(1, 10):
2101 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2103 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002104 encoding=encoding)
2105 if do_reads:
2106 got_lines = []
2107 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002108 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002109 if c2 == '':
2110 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002111 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002112 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002113 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002114 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002115
2116 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002117 self.assertEqual(got_line, exp_line)
2118 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 def test_newlines_input(self):
2121 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002122 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2123 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002124 (None, normalized.decode("ascii").splitlines(keepends=True)),
2125 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2127 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2128 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002129 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002130 buf = self.BytesIO(testdata)
2131 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002133 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 def test_newlines_output(self):
2137 testdict = {
2138 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2139 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2140 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2141 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2142 }
2143 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2144 for newline, expected in tests:
2145 buf = self.BytesIO()
2146 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2147 txt.write("AAA\nB")
2148 txt.write("BB\nCCC\n")
2149 txt.write("X\rY\r\nZ")
2150 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002151 self.assertEqual(buf.closed, False)
2152 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153
2154 def test_destructor(self):
2155 l = []
2156 base = self.BytesIO
2157 class MyBytesIO(base):
2158 def close(self):
2159 l.append(self.getvalue())
2160 base.close(self)
2161 b = MyBytesIO()
2162 t = self.TextIOWrapper(b, encoding="ascii")
2163 t.write("abc")
2164 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002165 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167
2168 def test_override_destructor(self):
2169 record = []
2170 class MyTextIO(self.TextIOWrapper):
2171 def __del__(self):
2172 record.append(1)
2173 try:
2174 f = super().__del__
2175 except AttributeError:
2176 pass
2177 else:
2178 f()
2179 def close(self):
2180 record.append(2)
2181 super().close()
2182 def flush(self):
2183 record.append(3)
2184 super().flush()
2185 b = self.BytesIO()
2186 t = MyTextIO(b, encoding="ascii")
2187 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002188 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 self.assertEqual(record, [1, 2, 3])
2190
2191 def test_error_through_destructor(self):
2192 # Test that the exception state is not modified by a destructor,
2193 # even if close() fails.
2194 rawio = self.CloseFailureIO()
2195 def f():
2196 self.TextIOWrapper(rawio).xyzzy
2197 with support.captured_output("stderr") as s:
2198 self.assertRaises(AttributeError, f)
2199 s = s.getvalue().strip()
2200 if s:
2201 # The destructor *may* have printed an unraisable error, check it
2202 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002203 self.assertTrue(s.startswith("Exception IOError: "), s)
2204 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002205
Guido van Rossum9b76da62007-04-11 01:09:03 +00002206 # Systematic tests of the text I/O API
2207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002209 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002210 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002212 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002214 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002216 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002217 self.assertEqual(f.tell(), 0)
2218 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002219 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(f.seek(0), 0)
2221 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002222 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(f.read(2), "ab")
2224 self.assertEqual(f.read(1), "c")
2225 self.assertEqual(f.read(1), "")
2226 self.assertEqual(f.read(), "")
2227 self.assertEqual(f.tell(), cookie)
2228 self.assertEqual(f.seek(0), 0)
2229 self.assertEqual(f.seek(0, 2), cookie)
2230 self.assertEqual(f.write("def"), 3)
2231 self.assertEqual(f.seek(cookie), cookie)
2232 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002233 if enc.startswith("utf"):
2234 self.multi_line_test(f, enc)
2235 f.close()
2236
2237 def multi_line_test(self, f, enc):
2238 f.seek(0)
2239 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002240 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002241 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002242 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002243 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002244 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002245 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002246 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002247 wlines.append((f.tell(), line))
2248 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002249 f.seek(0)
2250 rlines = []
2251 while True:
2252 pos = f.tell()
2253 line = f.readline()
2254 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002255 break
2256 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002260 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002261 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002262 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002263 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002264 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002265 p2 = f.tell()
2266 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002267 self.assertEqual(f.tell(), p0)
2268 self.assertEqual(f.readline(), "\xff\n")
2269 self.assertEqual(f.tell(), p1)
2270 self.assertEqual(f.readline(), "\xff\n")
2271 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002272 f.seek(0)
2273 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002274 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002275 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002276 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002277 f.close()
2278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 def test_seeking(self):
2280 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002281 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002282 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002283 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002284 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002285 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002286 suffix = bytes(u_suffix.encode("utf-8"))
2287 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002288 with self.open(support.TESTFN, "wb") as f:
2289 f.write(line*2)
2290 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2291 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002292 self.assertEqual(s, str(prefix, "ascii"))
2293 self.assertEqual(f.tell(), prefix_size)
2294 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002295
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002296 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002297 # Regression test for a specific bug
2298 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002299 with self.open(support.TESTFN, "wb") as f:
2300 f.write(data)
2301 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2302 f._CHUNK_SIZE # Just test that it exists
2303 f._CHUNK_SIZE = 2
2304 f.readline()
2305 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002306
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002307 def test_seek_and_tell(self):
2308 #Test seek/tell using the StatefulIncrementalDecoder.
2309 # Make test faster by doing smaller seeks
2310 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002311
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002312 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002313 """Tell/seek to various points within a data stream and ensure
2314 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002316 f.write(data)
2317 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002318 f = self.open(support.TESTFN, encoding='test_decoder')
2319 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002320 decoded = f.read()
2321 f.close()
2322
Neal Norwitze2b07052008-03-18 19:52:05 +00002323 for i in range(min_pos, len(decoded) + 1): # seek positions
2324 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002327 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002329 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331 f.close()
2332
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002333 # Enable the test decoder.
2334 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002335
2336 # Run the tests.
2337 try:
2338 # Try each test case.
2339 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002340 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002341
2342 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002343 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2344 offset = CHUNK_SIZE - len(input)//2
2345 prefix = b'.'*offset
2346 # Don't bother seeking into the prefix (takes too long).
2347 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002348 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349
2350 # Ensure our test decoder won't interfere with subsequent tests.
2351 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002352 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002355 data = "1234567890"
2356 tests = ("utf-16",
2357 "utf-16-le",
2358 "utf-16-be",
2359 "utf-32",
2360 "utf-32-le",
2361 "utf-32-be")
2362 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 buf = self.BytesIO()
2364 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002365 # Check if the BOM is written only once (see issue1753).
2366 f.write(data)
2367 f.write(data)
2368 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002369 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002370 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002371 self.assertEqual(f.read(), data * 2)
2372 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002373
Benjamin Petersona1b49012009-03-31 23:11:32 +00002374 def test_unreadable(self):
2375 class UnReadable(self.BytesIO):
2376 def readable(self):
2377 return False
2378 txt = self.TextIOWrapper(UnReadable())
2379 self.assertRaises(IOError, txt.read)
2380
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 def test_read_one_by_one(self):
2382 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383 reads = ""
2384 while True:
2385 c = txt.read(1)
2386 if not c:
2387 break
2388 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002390
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002391 def test_readlines(self):
2392 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2393 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2394 txt.seek(0)
2395 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2396 txt.seek(0)
2397 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2398
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002399 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002400 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002401 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002403 reads = ""
2404 while True:
2405 c = txt.read(128)
2406 if not c:
2407 break
2408 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002410
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002411 def test_writelines(self):
2412 l = ['ab', 'cd', 'ef']
2413 buf = self.BytesIO()
2414 txt = self.TextIOWrapper(buf)
2415 txt.writelines(l)
2416 txt.flush()
2417 self.assertEqual(buf.getvalue(), b'abcdef')
2418
2419 def test_writelines_userlist(self):
2420 l = UserList(['ab', 'cd', 'ef'])
2421 buf = self.BytesIO()
2422 txt = self.TextIOWrapper(buf)
2423 txt.writelines(l)
2424 txt.flush()
2425 self.assertEqual(buf.getvalue(), b'abcdef')
2426
2427 def test_writelines_error(self):
2428 txt = self.TextIOWrapper(self.BytesIO())
2429 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2430 self.assertRaises(TypeError, txt.writelines, None)
2431 self.assertRaises(TypeError, txt.writelines, b'abc')
2432
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002433 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002435
2436 # read one char at a time
2437 reads = ""
2438 while True:
2439 c = txt.read(1)
2440 if not c:
2441 break
2442 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002444
2445 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002446 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002447 txt._CHUNK_SIZE = 4
2448
2449 reads = ""
2450 while True:
2451 c = txt.read(4)
2452 if not c:
2453 break
2454 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002455 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002456
2457 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002458 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459 txt._CHUNK_SIZE = 4
2460
2461 reads = txt.read(4)
2462 reads += txt.read(4)
2463 reads += txt.readline()
2464 reads += txt.readline()
2465 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002467
2468 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002469 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002470 txt._CHUNK_SIZE = 4
2471
2472 reads = txt.read(4)
2473 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002474 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002475
2476 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002478 txt._CHUNK_SIZE = 4
2479
2480 reads = txt.read(4)
2481 pos = txt.tell()
2482 txt.seek(0)
2483 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002485
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002486 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002487 buffer = self.BytesIO(self.testdata)
2488 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002489
2490 self.assertEqual(buffer.seekable(), txt.seekable())
2491
Antoine Pitroue4501852009-05-14 18:55:55 +00002492 def test_append_bom(self):
2493 # The BOM is not written again when appending to a non-empty file
2494 filename = support.TESTFN
2495 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2496 with self.open(filename, 'w', encoding=charset) as f:
2497 f.write('aaa')
2498 pos = f.tell()
2499 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002500 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002501
2502 with self.open(filename, 'a', encoding=charset) as f:
2503 f.write('xxx')
2504 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002505 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002506
2507 def test_seek_bom(self):
2508 # Same test, but when seeking manually
2509 filename = support.TESTFN
2510 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2511 with self.open(filename, 'w', encoding=charset) as f:
2512 f.write('aaa')
2513 pos = f.tell()
2514 with self.open(filename, 'r+', encoding=charset) as f:
2515 f.seek(pos)
2516 f.write('zzz')
2517 f.seek(0)
2518 f.write('bbb')
2519 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002521
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002522 def test_errors_property(self):
2523 with self.open(support.TESTFN, "w") as f:
2524 self.assertEqual(f.errors, "strict")
2525 with self.open(support.TESTFN, "w", errors="replace") as f:
2526 self.assertEqual(f.errors, "replace")
2527
Brett Cannon31f59292011-02-21 19:29:56 +00002528 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002529 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002530 def test_threads_write(self):
2531 # Issue6750: concurrent writes could duplicate data
2532 event = threading.Event()
2533 with self.open(support.TESTFN, "w", buffering=1) as f:
2534 def run(n):
2535 text = "Thread%03d\n" % n
2536 event.wait()
2537 f.write(text)
2538 threads = [threading.Thread(target=lambda n=x: run(n))
2539 for x in range(20)]
2540 for t in threads:
2541 t.start()
2542 time.sleep(0.02)
2543 event.set()
2544 for t in threads:
2545 t.join()
2546 with self.open(support.TESTFN) as f:
2547 content = f.read()
2548 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002549 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002550
Antoine Pitrou6be88762010-05-03 16:48:20 +00002551 def test_flush_error_on_close(self):
2552 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2553 def bad_flush():
2554 raise IOError()
2555 txt.flush = bad_flush
2556 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002557 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002558
2559 def test_multi_close(self):
2560 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2561 txt.close()
2562 txt.close()
2563 txt.close()
2564 self.assertRaises(ValueError, txt.flush)
2565
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002566 def test_unseekable(self):
2567 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2568 self.assertRaises(self.UnsupportedOperation, txt.tell)
2569 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2570
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002571 def test_readonly_attributes(self):
2572 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2573 buf = self.BytesIO(self.testdata)
2574 with self.assertRaises(AttributeError):
2575 txt.buffer = buf
2576
Antoine Pitroue96ec682011-07-23 21:46:35 +02002577 def test_rawio(self):
2578 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2579 # that subprocess.Popen() can have the required unbuffered
2580 # semantics with universal_newlines=True.
2581 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2582 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2583 # Reads
2584 self.assertEqual(txt.read(4), 'abcd')
2585 self.assertEqual(txt.readline(), 'efghi\n')
2586 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2587
2588 def test_rawio_write_through(self):
2589 # Issue #12591: with write_through=True, writes don't need a flush
2590 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2591 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2592 write_through=True)
2593 txt.write('1')
2594 txt.write('23\n4')
2595 txt.write('5')
2596 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2597
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002598 def test_read_nonbytes(self):
2599 # Issue #17106
2600 # Crash when underlying read() returns non-bytes
2601 t = self.TextIOWrapper(self.StringIO('a'))
2602 self.assertRaises(TypeError, t.read, 1)
2603 t = self.TextIOWrapper(self.StringIO('a'))
2604 self.assertRaises(TypeError, t.readline)
2605 t = self.TextIOWrapper(self.StringIO('a'))
2606 self.assertRaises(TypeError, t.read)
2607
2608 def test_illegal_decoder(self):
2609 # Issue #17106
2610 # Crash when decoder returns non-string
2611 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2612 encoding='quopri_codec')
2613 self.assertRaises(TypeError, t.read, 1)
2614 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2615 encoding='quopri_codec')
2616 self.assertRaises(TypeError, t.readline)
2617 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2618 encoding='quopri_codec')
2619 self.assertRaises(TypeError, t.read)
2620
2621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002622class CTextIOWrapperTest(TextIOWrapperTest):
2623
2624 def test_initialization(self):
2625 r = self.BytesIO(b"\xc3\xa9\n\n")
2626 b = self.BufferedReader(r, 1000)
2627 t = self.TextIOWrapper(b)
2628 self.assertRaises(TypeError, t.__init__, b, newline=42)
2629 self.assertRaises(ValueError, t.read)
2630 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2631 self.assertRaises(ValueError, t.read)
2632
2633 def test_garbage_collection(self):
2634 # C TextIOWrapper objects are collected, and collecting them flushes
2635 # all data to disk.
2636 # The Python version has __del__, so it ends in gc.garbage instead.
2637 rawio = io.FileIO(support.TESTFN, "wb")
2638 b = self.BufferedWriter(rawio)
2639 t = self.TextIOWrapper(b, encoding="ascii")
2640 t.write("456def")
2641 t.x = t
2642 wr = weakref.ref(t)
2643 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002644 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002645 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002646 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 self.assertEqual(f.read(), b"456def")
2648
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002649 def test_rwpair_cleared_before_textio(self):
2650 # Issue 13070: TextIOWrapper's finalization would crash when called
2651 # after the reference to the underlying BufferedRWPair's writer got
2652 # cleared by the GC.
2653 for i in range(1000):
2654 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2655 t1 = self.TextIOWrapper(b1, encoding="ascii")
2656 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2657 t2 = self.TextIOWrapper(b2, encoding="ascii")
2658 # circular references
2659 t1.buddy = t2
2660 t2.buddy = t1
2661 support.gc_collect()
2662
2663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664class PyTextIOWrapperTest(TextIOWrapperTest):
2665 pass
2666
2667
2668class IncrementalNewlineDecoderTest(unittest.TestCase):
2669
2670 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002671 # UTF-8 specific tests for a newline decoder
2672 def _check_decode(b, s, **kwargs):
2673 # We exercise getstate() / setstate() as well as decode()
2674 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002675 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002676 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002677 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002678
Antoine Pitrou180a3362008-12-14 16:36:46 +00002679 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002680
Antoine Pitrou180a3362008-12-14 16:36:46 +00002681 _check_decode(b'\xe8', "")
2682 _check_decode(b'\xa2', "")
2683 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002684
Antoine Pitrou180a3362008-12-14 16:36:46 +00002685 _check_decode(b'\xe8', "")
2686 _check_decode(b'\xa2', "")
2687 _check_decode(b'\x88', "\u8888")
2688
2689 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002690 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2691
Antoine Pitrou180a3362008-12-14 16:36:46 +00002692 decoder.reset()
2693 _check_decode(b'\n', "\n")
2694 _check_decode(b'\r', "")
2695 _check_decode(b'', "\n", final=True)
2696 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002697
Antoine Pitrou180a3362008-12-14 16:36:46 +00002698 _check_decode(b'\r', "")
2699 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002700
Antoine Pitrou180a3362008-12-14 16:36:46 +00002701 _check_decode(b'\r\r\n', "\n\n")
2702 _check_decode(b'\r', "")
2703 _check_decode(b'\r', "\n")
2704 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002705
Antoine Pitrou180a3362008-12-14 16:36:46 +00002706 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2707 _check_decode(b'\xe8\xa2\x88', "\u8888")
2708 _check_decode(b'\n', "\n")
2709 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2710 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002713 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 if encoding is not None:
2715 encoder = codecs.getincrementalencoder(encoding)()
2716 def _decode_bytewise(s):
2717 # Decode one byte at a time
2718 for b in encoder.encode(s):
2719 result.append(decoder.decode(bytes([b])))
2720 else:
2721 encoder = None
2722 def _decode_bytewise(s):
2723 # Decode one char at a time
2724 for c in s:
2725 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002726 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002727 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002728 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002729 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002730 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002731 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002732 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002733 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002735 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002737 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 input = "abc"
2739 if encoder is not None:
2740 encoder.reset()
2741 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(decoder.decode(input), "abc")
2743 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002744
2745 def test_newline_decoder(self):
2746 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747 # None meaning the IncrementalNewlineDecoder takes unicode input
2748 # rather than bytes input
2749 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002750 'utf-16', 'utf-16-le', 'utf-16-be',
2751 'utf-32', 'utf-32-le', 'utf-32-be',
2752 )
2753 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 decoder = enc and codecs.getincrementaldecoder(enc)()
2755 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2756 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002757 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2759 self.check_newline_decoding_utf8(decoder)
2760
Antoine Pitrou66913e22009-03-06 23:40:56 +00002761 def test_newline_bytes(self):
2762 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2763 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(dec.newlines, None)
2765 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2766 self.assertEqual(dec.newlines, None)
2767 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2768 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002769 dec = self.IncrementalNewlineDecoder(None, translate=False)
2770 _check(dec)
2771 dec = self.IncrementalNewlineDecoder(None, translate=True)
2772 _check(dec)
2773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2775 pass
2776
2777class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2778 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002779
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002780
Guido van Rossum01a27522007-03-07 01:00:12 +00002781# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002782
Guido van Rossum5abbf752007-08-27 17:39:33 +00002783class MiscIOTest(unittest.TestCase):
2784
Barry Warsaw40e82462008-11-20 20:14:50 +00002785 def tearDown(self):
2786 support.unlink(support.TESTFN)
2787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002788 def test___all__(self):
2789 for name in self.io.__all__:
2790 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002791 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002792 if name == "open":
2793 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002794 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002795 self.assertTrue(issubclass(obj, Exception), name)
2796 elif not name.startswith("SEEK_"):
2797 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002798
Barry Warsaw40e82462008-11-20 20:14:50 +00002799 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002800 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002801 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002802 f.close()
2803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002805 self.assertEqual(f.name, support.TESTFN)
2806 self.assertEqual(f.buffer.name, support.TESTFN)
2807 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2808 self.assertEqual(f.mode, "U")
2809 self.assertEqual(f.buffer.mode, "rb")
2810 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002811 f.close()
2812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002813 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002814 self.assertEqual(f.mode, "w+")
2815 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2816 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002819 self.assertEqual(g.mode, "wb")
2820 self.assertEqual(g.raw.mode, "wb")
2821 self.assertEqual(g.name, f.fileno())
2822 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002823 f.close()
2824 g.close()
2825
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002826 def test_io_after_close(self):
2827 for kwargs in [
2828 {"mode": "w"},
2829 {"mode": "wb"},
2830 {"mode": "w", "buffering": 1},
2831 {"mode": "w", "buffering": 2},
2832 {"mode": "wb", "buffering": 0},
2833 {"mode": "r"},
2834 {"mode": "rb"},
2835 {"mode": "r", "buffering": 1},
2836 {"mode": "r", "buffering": 2},
2837 {"mode": "rb", "buffering": 0},
2838 {"mode": "w+"},
2839 {"mode": "w+b"},
2840 {"mode": "w+", "buffering": 1},
2841 {"mode": "w+", "buffering": 2},
2842 {"mode": "w+b", "buffering": 0},
2843 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002844 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002845 f.close()
2846 self.assertRaises(ValueError, f.flush)
2847 self.assertRaises(ValueError, f.fileno)
2848 self.assertRaises(ValueError, f.isatty)
2849 self.assertRaises(ValueError, f.__iter__)
2850 if hasattr(f, "peek"):
2851 self.assertRaises(ValueError, f.peek, 1)
2852 self.assertRaises(ValueError, f.read)
2853 if hasattr(f, "read1"):
2854 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002855 if hasattr(f, "readall"):
2856 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002857 if hasattr(f, "readinto"):
2858 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2859 self.assertRaises(ValueError, f.readline)
2860 self.assertRaises(ValueError, f.readlines)
2861 self.assertRaises(ValueError, f.seek, 0)
2862 self.assertRaises(ValueError, f.tell)
2863 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002864 self.assertRaises(ValueError, f.write,
2865 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002866 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002867 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002869 def test_blockingioerror(self):
2870 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002871 class C(str):
2872 pass
2873 c = C("")
2874 b = self.BlockingIOError(1, c)
2875 c.b = b
2876 b.c = c
2877 wr = weakref.ref(c)
2878 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002879 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002880 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002881
2882 def test_abcs(self):
2883 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002884 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2885 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2886 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2887 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888
2889 def _check_abc_inheritance(self, abcmodule):
2890 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002891 self.assertIsInstance(f, abcmodule.IOBase)
2892 self.assertIsInstance(f, abcmodule.RawIOBase)
2893 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2894 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002895 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002896 self.assertIsInstance(f, abcmodule.IOBase)
2897 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2898 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2899 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002900 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002901 self.assertIsInstance(f, abcmodule.IOBase)
2902 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2903 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2904 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002905
2906 def test_abc_inheritance(self):
2907 # Test implementations inherit from their respective ABCs
2908 self._check_abc_inheritance(self)
2909
2910 def test_abc_inheritance_official(self):
2911 # Test implementations inherit from the official ABCs of the
2912 # baseline "io" module.
2913 self._check_abc_inheritance(io)
2914
Antoine Pitroue033e062010-10-29 10:38:18 +00002915 def _check_warn_on_dealloc(self, *args, **kwargs):
2916 f = open(*args, **kwargs)
2917 r = repr(f)
2918 with self.assertWarns(ResourceWarning) as cm:
2919 f = None
2920 support.gc_collect()
2921 self.assertIn(r, str(cm.warning.args[0]))
2922
2923 def test_warn_on_dealloc(self):
2924 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2925 self._check_warn_on_dealloc(support.TESTFN, "wb")
2926 self._check_warn_on_dealloc(support.TESTFN, "w")
2927
2928 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2929 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002930 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002931 for fd in fds:
2932 try:
2933 os.close(fd)
2934 except EnvironmentError as e:
2935 if e.errno != errno.EBADF:
2936 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002937 self.addCleanup(cleanup_fds)
2938 r, w = os.pipe()
2939 fds += r, w
2940 self._check_warn_on_dealloc(r, *args, **kwargs)
2941 # When using closefd=False, there's no warning
2942 r, w = os.pipe()
2943 fds += r, w
2944 with warnings.catch_warnings(record=True) as recorded:
2945 open(r, *args, closefd=False, **kwargs)
2946 support.gc_collect()
2947 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002948
2949 def test_warn_on_dealloc_fd(self):
2950 self._check_warn_on_dealloc_fd("rb", buffering=0)
2951 self._check_warn_on_dealloc_fd("rb")
2952 self._check_warn_on_dealloc_fd("r")
2953
2954
Antoine Pitrou243757e2010-11-05 21:15:39 +00002955 def test_pickling(self):
2956 # Pickling file objects is forbidden
2957 for kwargs in [
2958 {"mode": "w"},
2959 {"mode": "wb"},
2960 {"mode": "wb", "buffering": 0},
2961 {"mode": "r"},
2962 {"mode": "rb"},
2963 {"mode": "rb", "buffering": 0},
2964 {"mode": "w+"},
2965 {"mode": "w+b"},
2966 {"mode": "w+b", "buffering": 0},
2967 ]:
2968 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2969 with self.open(support.TESTFN, **kwargs) as f:
2970 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2971
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002972 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2973 def test_nonblock_pipe_write_bigbuf(self):
2974 self._test_nonblock_pipe_write(16*1024)
2975
2976 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2977 def test_nonblock_pipe_write_smallbuf(self):
2978 self._test_nonblock_pipe_write(1024)
2979
2980 def _set_non_blocking(self, fd):
2981 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2982 self.assertNotEqual(flags, -1)
2983 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2984 self.assertEqual(res, 0)
2985
2986 def _test_nonblock_pipe_write(self, bufsize):
2987 sent = []
2988 received = []
2989 r, w = os.pipe()
2990 self._set_non_blocking(r)
2991 self._set_non_blocking(w)
2992
2993 # To exercise all code paths in the C implementation we need
2994 # to play with buffer sizes. For instance, if we choose a
2995 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2996 # then we will never get a partial write of the buffer.
2997 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2998 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2999
3000 with rf, wf:
3001 for N in 9999, 73, 7574:
3002 try:
3003 i = 0
3004 while True:
3005 msg = bytes([i % 26 + 97]) * N
3006 sent.append(msg)
3007 wf.write(msg)
3008 i += 1
3009
3010 except self.BlockingIOError as e:
3011 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003012 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003013 sent[-1] = sent[-1][:e.characters_written]
3014 received.append(rf.read())
3015 msg = b'BLOCKED'
3016 wf.write(msg)
3017 sent.append(msg)
3018
3019 while True:
3020 try:
3021 wf.flush()
3022 break
3023 except self.BlockingIOError as e:
3024 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003025 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003026 self.assertEqual(e.characters_written, 0)
3027 received.append(rf.read())
3028
3029 received += iter(rf.read, None)
3030
3031 sent, received = b''.join(sent), b''.join(received)
3032 self.assertTrue(sent == received)
3033 self.assertTrue(wf.closed)
3034 self.assertTrue(rf.closed)
3035
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003036 def test_create_fail(self):
3037 # 'x' mode fails if file is existing
3038 with self.open(support.TESTFN, 'w'):
3039 pass
3040 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3041
3042 def test_create_writes(self):
3043 # 'x' mode opens for writing
3044 with self.open(support.TESTFN, 'xb') as f:
3045 f.write(b"spam")
3046 with self.open(support.TESTFN, 'rb') as f:
3047 self.assertEqual(b"spam", f.read())
3048
Christian Heimes7b648752012-09-10 14:48:43 +02003049 def test_open_allargs(self):
3050 # there used to be a buffer overflow in the parser for rawmode
3051 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3052
3053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003054class CMiscIOTest(MiscIOTest):
3055 io = io
3056
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003057 def test_readinto_buffer_overflow(self):
3058 # Issue #18025
3059 class BadReader(self.io.BufferedIOBase):
3060 def read(self, n=-1):
3061 return b'x' * 10**6
3062 bufio = BadReader()
3063 b = bytearray(2)
3064 self.assertRaises(ValueError, bufio.readinto, b)
3065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003066class PyMiscIOTest(MiscIOTest):
3067 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003068
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003069
3070@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3071class SignalsTest(unittest.TestCase):
3072
3073 def setUp(self):
3074 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3075
3076 def tearDown(self):
3077 signal.signal(signal.SIGALRM, self.oldalrm)
3078
3079 def alarm_interrupt(self, sig, frame):
3080 1/0
3081
3082 @unittest.skipUnless(threading, 'Threading required for this test.')
3083 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3084 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003085 invokes the signal handler, and bubbles up the exception raised
3086 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003087 read_results = []
3088 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003089 if hasattr(signal, 'pthread_sigmask'):
3090 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003091 s = os.read(r, 1)
3092 read_results.append(s)
3093 t = threading.Thread(target=_read)
3094 t.daemon = True
3095 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003096 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003097 try:
3098 wio = self.io.open(w, **fdopen_kwargs)
3099 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003100 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003101 # Fill the pipe enough that the write will be blocking.
3102 # It will be interrupted by the timer armed above. Since the
3103 # other thread has read one byte, the low-level write will
3104 # return with a successful (partial) result rather than an EINTR.
3105 # The buffered IO layer must check for pending signal
3106 # handlers, which in this case will invoke alarm_interrupt().
3107 self.assertRaises(ZeroDivisionError,
Antoine Pitroue1a16742013-04-24 23:31:38 +02003108 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003109 t.join()
3110 # We got one byte, get another one and check that it isn't a
3111 # repeat of the first one.
3112 read_results.append(os.read(r, 1))
3113 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3114 finally:
3115 os.close(w)
3116 os.close(r)
3117 # This is deliberate. If we didn't close the file descriptor
3118 # before closing wio, wio would try to flush its internal
3119 # buffer, and block again.
3120 try:
3121 wio.close()
3122 except IOError as e:
3123 if e.errno != errno.EBADF:
3124 raise
3125
3126 def test_interrupted_write_unbuffered(self):
3127 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3128
3129 def test_interrupted_write_buffered(self):
3130 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3131
3132 def test_interrupted_write_text(self):
3133 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3134
Brett Cannon31f59292011-02-21 19:29:56 +00003135 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003136 def check_reentrant_write(self, data, **fdopen_kwargs):
3137 def on_alarm(*args):
3138 # Will be called reentrantly from the same thread
3139 wio.write(data)
3140 1/0
3141 signal.signal(signal.SIGALRM, on_alarm)
3142 r, w = os.pipe()
3143 wio = self.io.open(w, **fdopen_kwargs)
3144 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003145 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003146 # Either the reentrant call to wio.write() fails with RuntimeError,
3147 # or the signal handler raises ZeroDivisionError.
3148 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3149 while 1:
3150 for i in range(100):
3151 wio.write(data)
3152 wio.flush()
3153 # Make sure the buffer doesn't fill up and block further writes
3154 os.read(r, len(data) * 100)
3155 exc = cm.exception
3156 if isinstance(exc, RuntimeError):
3157 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3158 finally:
3159 wio.close()
3160 os.close(r)
3161
3162 def test_reentrant_write_buffered(self):
3163 self.check_reentrant_write(b"xy", mode="wb")
3164
3165 def test_reentrant_write_text(self):
3166 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3167
Antoine Pitrou707ce822011-02-25 21:24:11 +00003168 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3169 """Check that a buffered read, when it gets interrupted (either
3170 returning a partial result or EINTR), properly invokes the signal
3171 handler and retries if the latter returned successfully."""
3172 r, w = os.pipe()
3173 fdopen_kwargs["closefd"] = False
3174 def alarm_handler(sig, frame):
3175 os.write(w, b"bar")
3176 signal.signal(signal.SIGALRM, alarm_handler)
3177 try:
3178 rio = self.io.open(r, **fdopen_kwargs)
3179 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003180 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003181 # Expected behaviour:
3182 # - first raw read() returns partial b"foo"
3183 # - second raw read() returns EINTR
3184 # - third raw read() returns b"bar"
3185 self.assertEqual(decode(rio.read(6)), "foobar")
3186 finally:
3187 rio.close()
3188 os.close(w)
3189 os.close(r)
3190
Antoine Pitrou20db5112011-08-19 20:32:34 +02003191 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003192 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3193 mode="rb")
3194
Antoine Pitrou20db5112011-08-19 20:32:34 +02003195 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003196 self.check_interrupted_read_retry(lambda x: x,
3197 mode="r")
3198
3199 @unittest.skipUnless(threading, 'Threading required for this test.')
3200 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3201 """Check that a buffered write, when it gets interrupted (either
3202 returning a partial result or EINTR), properly invokes the signal
3203 handler and retries if the latter returned successfully."""
3204 select = support.import_module("select")
3205 # A quantity that exceeds the buffer size of an anonymous pipe's
3206 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003207 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003208 r, w = os.pipe()
3209 fdopen_kwargs["closefd"] = False
3210 # We need a separate thread to read from the pipe and allow the
3211 # write() to finish. This thread is started after the SIGALRM is
3212 # received (forcing a first EINTR in write()).
3213 read_results = []
3214 write_finished = False
3215 def _read():
3216 while not write_finished:
3217 while r in select.select([r], [], [], 1.0)[0]:
3218 s = os.read(r, 1024)
3219 read_results.append(s)
3220 t = threading.Thread(target=_read)
3221 t.daemon = True
3222 def alarm1(sig, frame):
3223 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003224 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003225 def alarm2(sig, frame):
3226 t.start()
3227 signal.signal(signal.SIGALRM, alarm1)
3228 try:
3229 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003230 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003231 # Expected behaviour:
3232 # - first raw write() is partial (because of the limited pipe buffer
3233 # and the first alarm)
3234 # - second raw write() returns EINTR (because of the second alarm)
3235 # - subsequent write()s are successful (either partial or complete)
3236 self.assertEqual(N, wio.write(item * N))
3237 wio.flush()
3238 write_finished = True
3239 t.join()
3240 self.assertEqual(N, sum(len(x) for x in read_results))
3241 finally:
3242 write_finished = True
3243 os.close(w)
3244 os.close(r)
3245 # This is deliberate. If we didn't close the file descriptor
3246 # before closing wio, wio would try to flush its internal
3247 # buffer, and could block (in case of failure).
3248 try:
3249 wio.close()
3250 except IOError as e:
3251 if e.errno != errno.EBADF:
3252 raise
3253
Antoine Pitrou20db5112011-08-19 20:32:34 +02003254 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003255 self.check_interrupted_write_retry(b"x", mode="wb")
3256
Antoine Pitrou20db5112011-08-19 20:32:34 +02003257 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003258 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3259
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003260
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003261class CSignalsTest(SignalsTest):
3262 io = io
3263
3264class PySignalsTest(SignalsTest):
3265 io = pyio
3266
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003267 # Handling reentrancy issues would slow down _pyio even more, so the
3268 # tests are disabled.
3269 test_reentrant_write_buffered = None
3270 test_reentrant_write_text = None
3271
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003272
Ezio Melottidaa42c72013-03-23 16:30:16 +02003273def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003274 tests = (CIOTest, PyIOTest,
3275 CBufferedReaderTest, PyBufferedReaderTest,
3276 CBufferedWriterTest, PyBufferedWriterTest,
3277 CBufferedRWPairTest, PyBufferedRWPairTest,
3278 CBufferedRandomTest, PyBufferedRandomTest,
3279 StatefulIncrementalDecoderTest,
3280 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3281 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003282 CMiscIOTest, PyMiscIOTest,
3283 CSignalsTest, PySignalsTest,
3284 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003285
3286 # Put the namespaces of the IO module we are testing and some useful mock
3287 # classes in the __dict__ of each test.
3288 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003289 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003290 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3291 c_io_ns = {name : getattr(io, name) for name in all_members}
3292 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3293 globs = globals()
3294 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3295 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3296 # Avoid turning open into a bound method.
3297 py_io_ns["open"] = pyio.OpenWrapper
3298 for test in tests:
3299 if test.__name__.startswith("C"):
3300 for name, obj in c_io_ns.items():
3301 setattr(test, name, obj)
3302 elif test.__name__.startswith("Py"):
3303 for name, obj in py_io_ns.items():
3304 setattr(test, name, obj)
3305
Ezio Melottidaa42c72013-03-23 16:30:16 +02003306 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3307 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003308
3309if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003310 unittest.main()