blob: ad86301fcd29492b105020bd4b7db3c0f188c144 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
366 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Antoine Pitrou6be88762010-05-03 16:48:20 +0000592 def test_flush_error_on_close(self):
593 f = self.open(support.TESTFN, "wb", buffering=0)
594 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200595 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200597 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600598 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599
600 def test_multi_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 f.close()
603 f.close()
604 f.close()
605 self.assertRaises(ValueError, f.flush)
606
Antoine Pitrou328ec742010-09-14 18:37:24 +0000607 def test_RawIOBase_read(self):
608 # Exercise the default RawIOBase.read() implementation (which calls
609 # readinto() internally).
610 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
611 self.assertEqual(rawio.read(2), b"ab")
612 self.assertEqual(rawio.read(2), b"c")
613 self.assertEqual(rawio.read(2), b"d")
614 self.assertEqual(rawio.read(2), None)
615 self.assertEqual(rawio.read(2), b"ef")
616 self.assertEqual(rawio.read(2), b"g")
617 self.assertEqual(rawio.read(2), None)
618 self.assertEqual(rawio.read(2), b"")
619
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400620 def test_types_have_dict(self):
621 test = (
622 self.IOBase(),
623 self.RawIOBase(),
624 self.TextIOBase(),
625 self.StringIO(),
626 self.BytesIO()
627 )
628 for obj in test:
629 self.assertTrue(hasattr(obj, "__dict__"))
630
Ross Lagerwall59142db2011-10-31 20:34:46 +0200631 def test_opener(self):
632 with self.open(support.TESTFN, "w") as f:
633 f.write("egg\n")
634 fd = os.open(support.TESTFN, os.O_RDONLY)
635 def opener(path, flags):
636 return fd
637 with self.open("non-existent", "r", opener=opener) as f:
638 self.assertEqual(f.read(), "egg\n")
639
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200640 def test_fileio_closefd(self):
641 # Issue #4841
642 with self.open(__file__, 'rb') as f1, \
643 self.open(__file__, 'rb') as f2:
644 fileio = self.FileIO(f1.fileno(), closefd=False)
645 # .__init__() must not close f1
646 fileio.__init__(f2.fileno(), closefd=False)
647 f1.readline()
648 # .close() must not close f2
649 fileio.close()
650 f2.readline()
651
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300652 def test_nonbuffered_textio(self):
653 with warnings.catch_warnings(record=True) as recorded:
654 with self.assertRaises(ValueError):
655 self.open(support.TESTFN, 'w', buffering=0)
656 support.gc_collect()
657 self.assertEqual(recorded, [])
658
659 def test_invalid_newline(self):
660 with warnings.catch_warnings(record=True) as recorded:
661 with self.assertRaises(ValueError):
662 self.open(support.TESTFN, 'w', newline='invalid')
663 support.gc_collect()
664 self.assertEqual(recorded, [])
665
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200668
669 def test_IOBase_finalize(self):
670 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
671 # class which inherits IOBase and an object of this class are caught
672 # in a reference cycle and close() is already in the method cache.
673 class MyIO(self.IOBase):
674 def close(self):
675 pass
676
677 # create an instance to populate the method cache
678 MyIO()
679 obj = MyIO()
680 obj.obj = obj
681 wr = weakref.ref(obj)
682 del MyIO
683 del obj
684 support.gc_collect()
685 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687class PyIOTest(IOTest):
688 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000689
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class CommonBufferedTests:
692 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
693
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000694 def test_detach(self):
695 raw = self.MockRawIO()
696 buf = self.tp(raw)
697 self.assertIs(buf.detach(), raw)
698 self.assertRaises(ValueError, buf.detach)
699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_fileno(self):
701 rawio = self.MockRawIO()
702 bufio = self.tp(rawio)
703
Ezio Melottib3aedd42010-11-20 19:04:17 +0000704 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000705
Zachary Ware9fe6d862013-12-08 00:20:35 -0600706 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_no_fileno(self):
708 # XXX will we always have fileno() function? If so, kill
709 # this test. Else, write it.
710 pass
711
712 def test_invalid_args(self):
713 rawio = self.MockRawIO()
714 bufio = self.tp(rawio)
715 # Invalid whence
716 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200717 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718
719 def test_override_destructor(self):
720 tp = self.tp
721 record = []
722 class MyBufferedIO(tp):
723 def __del__(self):
724 record.append(1)
725 try:
726 f = super().__del__
727 except AttributeError:
728 pass
729 else:
730 f()
731 def close(self):
732 record.append(2)
733 super().close()
734 def flush(self):
735 record.append(3)
736 super().flush()
737 rawio = self.MockRawIO()
738 bufio = MyBufferedIO(rawio)
739 writable = bufio.writable()
740 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000741 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 if writable:
743 self.assertEqual(record, [1, 2, 3])
744 else:
745 self.assertEqual(record, [1, 2])
746
747 def test_context_manager(self):
748 # Test usability as a context manager
749 rawio = self.MockRawIO()
750 bufio = self.tp(rawio)
751 def _with():
752 with bufio:
753 pass
754 _with()
755 # bufio should now be closed, and using it a second time should raise
756 # a ValueError.
757 self.assertRaises(ValueError, _with)
758
759 def test_error_through_destructor(self):
760 # Test that the exception state is not modified by a destructor,
761 # even if close() fails.
762 rawio = self.CloseFailureIO()
763 def f():
764 self.tp(rawio).xyzzy
765 with support.captured_output("stderr") as s:
766 self.assertRaises(AttributeError, f)
767 s = s.getvalue().strip()
768 if s:
769 # The destructor *may* have printed an unraisable error, check it
770 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200771 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000772 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000773
Antoine Pitrou716c4442009-05-23 19:04:03 +0000774 def test_repr(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300777 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000778 self.assertEqual(repr(b), "<%s>" % clsname)
779 raw.name = "dummy"
780 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
781 raw.name = b"dummy"
782 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
783
Antoine Pitrou6be88762010-05-03 16:48:20 +0000784 def test_flush_error_on_close(self):
785 raw = self.MockRawIO()
786 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200787 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 raw.flush = bad_flush
789 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 self.assertTrue(b.closed)
792
793 def test_close_error_on_close(self):
794 raw = self.MockRawIO()
795 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200798 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600799 raw.close = bad_close
800 b = self.tp(raw)
801 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 b.close()
804 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300805 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600806 self.assertEqual(err.exception.__context__.args, ('flush',))
807 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000808
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300809 def test_nonnormalized_close_error_on_close(self):
810 # Issue #21677
811 raw = self.MockRawIO()
812 def bad_flush():
813 raise non_existing_flush
814 def bad_close():
815 raise non_existing_close
816 raw.close = bad_close
817 b = self.tp(raw)
818 b.flush = bad_flush
819 with self.assertRaises(NameError) as err: # exception not swallowed
820 b.close()
821 self.assertIn('non_existing_close', str(err.exception))
822 self.assertIsInstance(err.exception.__context__, NameError)
823 self.assertIn('non_existing_flush', str(err.exception.__context__))
824 self.assertFalse(b.closed)
825
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826 def test_multi_close(self):
827 raw = self.MockRawIO()
828 b = self.tp(raw)
829 b.close()
830 b.close()
831 b.close()
832 self.assertRaises(ValueError, b.flush)
833
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000834 def test_unseekable(self):
835 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
836 self.assertRaises(self.UnsupportedOperation, bufio.tell)
837 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
838
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000839 def test_readonly_attributes(self):
840 raw = self.MockRawIO()
841 buf = self.tp(raw)
842 x = self.MockRawIO()
843 with self.assertRaises(AttributeError):
844 buf.raw = x
845
Guido van Rossum78892e42007-04-06 17:31:18 +0000846
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200847class SizeofTest:
848
849 @support.cpython_only
850 def test_sizeof(self):
851 bufsize1 = 4096
852 bufsize2 = 8192
853 rawio = self.MockRawIO()
854 bufio = self.tp(rawio, buffer_size=bufsize1)
855 size = sys.getsizeof(bufio) - bufsize1
856 rawio = self.MockRawIO()
857 bufio = self.tp(rawio, buffer_size=bufsize2)
858 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
859
Jesus Ceadc469452012-10-04 12:37:56 +0200860 @support.cpython_only
861 def test_buffer_freeing(self) :
862 bufsize = 4096
863 rawio = self.MockRawIO()
864 bufio = self.tp(rawio, buffer_size=bufsize)
865 size = sys.getsizeof(bufio) - bufsize
866 bufio.close()
867 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
870 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_constructor(self):
873 rawio = self.MockRawIO([b"abc"])
874 bufio = self.tp(rawio)
875 bufio.__init__(rawio)
876 bufio.__init__(rawio, buffer_size=1024)
877 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000878 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
880 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
882 rawio = self.MockRawIO([b"abc"])
883 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000885
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200886 def test_uninitialized(self):
887 bufio = self.tp.__new__(self.tp)
888 del bufio
889 bufio = self.tp.__new__(self.tp)
890 self.assertRaisesRegex((ValueError, AttributeError),
891 'uninitialized|has no attribute',
892 bufio.read, 0)
893 bufio.__init__(self.MockRawIO())
894 self.assertEqual(bufio.read(0), b'')
895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000897 for arg in (None, 7):
898 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
899 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000900 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 # Invalid args
902 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_read1(self):
905 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
906 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000907 self.assertEqual(b"a", bufio.read(1))
908 self.assertEqual(b"b", bufio.read1(1))
909 self.assertEqual(rawio._reads, 1)
910 self.assertEqual(b"c", bufio.read1(100))
911 self.assertEqual(rawio._reads, 1)
912 self.assertEqual(b"d", bufio.read1(100))
913 self.assertEqual(rawio._reads, 2)
914 self.assertEqual(b"efg", bufio.read1(100))
915 self.assertEqual(rawio._reads, 3)
916 self.assertEqual(b"", bufio.read1(100))
917 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 # Invalid args
919 self.assertRaises(ValueError, bufio.read1, -1)
920
921 def test_readinto(self):
922 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
923 bufio = self.tp(rawio)
924 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000925 self.assertEqual(bufio.readinto(b), 2)
926 self.assertEqual(b, b"ab")
927 self.assertEqual(bufio.readinto(b), 2)
928 self.assertEqual(b, b"cd")
929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"ef")
931 self.assertEqual(bufio.readinto(b), 1)
932 self.assertEqual(b, b"gf")
933 self.assertEqual(bufio.readinto(b), 0)
934 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200935 rawio = self.MockRawIO((b"abc", None))
936 bufio = self.tp(rawio)
937 self.assertEqual(bufio.readinto(b), 2)
938 self.assertEqual(b, b"ab")
939 self.assertEqual(bufio.readinto(b), 1)
940 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941
Benjamin Petersona96fea02014-06-22 14:17:44 -0700942 def test_readinto1(self):
943 buffer_size = 10
944 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
945 bufio = self.tp(rawio, buffer_size=buffer_size)
946 b = bytearray(2)
947 self.assertEqual(bufio.peek(3), b'abc')
948 self.assertEqual(rawio._reads, 1)
949 self.assertEqual(bufio.readinto1(b), 2)
950 self.assertEqual(b, b"ab")
951 self.assertEqual(rawio._reads, 1)
952 self.assertEqual(bufio.readinto1(b), 1)
953 self.assertEqual(b[:1], b"c")
954 self.assertEqual(rawio._reads, 1)
955 self.assertEqual(bufio.readinto1(b), 2)
956 self.assertEqual(b, b"de")
957 self.assertEqual(rawio._reads, 2)
958 b = bytearray(2*buffer_size)
959 self.assertEqual(bufio.peek(3), b'fgh')
960 self.assertEqual(rawio._reads, 3)
961 self.assertEqual(bufio.readinto1(b), 6)
962 self.assertEqual(b[:6], b"fghjkl")
963 self.assertEqual(rawio._reads, 4)
964
965 def test_readinto_array(self):
966 buffer_size = 60
967 data = b"a" * 26
968 rawio = self.MockRawIO((data,))
969 bufio = self.tp(rawio, buffer_size=buffer_size)
970
971 # Create an array with element size > 1 byte
972 b = array.array('i', b'x' * 32)
973 assert len(b) != 16
974
975 # Read into it. We should get as many *bytes* as we can fit into b
976 # (which is more than the number of elements)
977 n = bufio.readinto(b)
978 self.assertGreater(n, len(b))
979
980 # Check that old contents of b are preserved
981 bm = memoryview(b).cast('B')
982 self.assertLess(n, len(bm))
983 self.assertEqual(bm[:n], data[:n])
984 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
985
986 def test_readinto1_array(self):
987 buffer_size = 60
988 data = b"a" * 26
989 rawio = self.MockRawIO((data,))
990 bufio = self.tp(rawio, buffer_size=buffer_size)
991
992 # Create an array with element size > 1 byte
993 b = array.array('i', b'x' * 32)
994 assert len(b) != 16
995
996 # Read into it. We should get as many *bytes* as we can fit into b
997 # (which is more than the number of elements)
998 n = bufio.readinto1(b)
999 self.assertGreater(n, len(b))
1000
1001 # Check that old contents of b are preserved
1002 bm = memoryview(b).cast('B')
1003 self.assertLess(n, len(bm))
1004 self.assertEqual(bm[:n], data[:n])
1005 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1006
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001007 def test_readlines(self):
1008 def bufio():
1009 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1010 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1012 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1013 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001016 data = b"abcdefghi"
1017 dlen = len(data)
1018
1019 tests = [
1020 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1021 [ 100, [ 3, 3, 3], [ dlen ] ],
1022 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1023 ]
1024
1025 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 rawio = self.MockFileIO(data)
1027 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001028 pos = 0
1029 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001030 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001031 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001033 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001036 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1038 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001039 self.assertEqual(b"abcd", bufio.read(6))
1040 self.assertEqual(b"e", bufio.read(1))
1041 self.assertEqual(b"fg", bufio.read())
1042 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001043 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001044 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001045
Victor Stinnera80987f2011-05-25 22:47:16 +02001046 rawio = self.MockRawIO((b"a", None, None))
1047 self.assertEqual(b"a", rawio.readall())
1048 self.assertIsNone(rawio.readall())
1049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 def test_read_past_eof(self):
1051 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1052 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001053
Ezio Melottib3aedd42010-11-20 19:04:17 +00001054 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_read_all(self):
1057 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1058 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059
Ezio Melottib3aedd42010-11-20 19:04:17 +00001060 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061
Victor Stinner45df8202010-04-28 22:31:17 +00001062 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001063 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001065 try:
1066 # Write out many bytes with exactly the same number of 0's,
1067 # 1's... 255's. This will help us check that concurrent reading
1068 # doesn't duplicate or forget contents.
1069 N = 1000
1070 l = list(range(256)) * N
1071 random.shuffle(l)
1072 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001073 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001074 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001075 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001077 errors = []
1078 results = []
1079 def f():
1080 try:
1081 # Intra-buffer read then buffer-flushing read
1082 for n in cycle([1, 19]):
1083 s = bufio.read(n)
1084 if not s:
1085 break
1086 # list.append() is atomic
1087 results.append(s)
1088 except Exception as e:
1089 errors.append(e)
1090 raise
1091 threads = [threading.Thread(target=f) for x in range(20)]
1092 for t in threads:
1093 t.start()
1094 time.sleep(0.02) # yield
1095 for t in threads:
1096 t.join()
1097 self.assertFalse(errors,
1098 "the following exceptions were caught: %r" % errors)
1099 s = b''.join(results)
1100 for i in range(256):
1101 c = bytes(bytearray([i]))
1102 self.assertEqual(s.count(c), N)
1103 finally:
1104 support.unlink(support.TESTFN)
1105
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001106 def test_unseekable(self):
1107 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1108 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1109 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1110 bufio.read(1)
1111 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1112 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_misbehaved_io(self):
1115 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1116 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001117 self.assertRaises(OSError, bufio.seek, 0)
1118 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001120 def test_no_extraneous_read(self):
1121 # Issue #9550; when the raw IO object has satisfied the read request,
1122 # we should not issue any additional reads, otherwise it may block
1123 # (e.g. socket).
1124 bufsize = 16
1125 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1126 rawio = self.MockRawIO([b"x" * n])
1127 bufio = self.tp(rawio, bufsize)
1128 self.assertEqual(bufio.read(n), b"x" * n)
1129 # Simple case: one raw read is enough to satisfy the request.
1130 self.assertEqual(rawio._extraneous_reads, 0,
1131 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1132 # A more complex case where two raw reads are needed to satisfy
1133 # the request.
1134 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1135 bufio = self.tp(rawio, bufsize)
1136 self.assertEqual(bufio.read(n), b"x" * n)
1137 self.assertEqual(rawio._extraneous_reads, 0,
1138 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1139
1140
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001141class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 tp = io.BufferedReader
1143
1144 def test_constructor(self):
1145 BufferedReaderTest.test_constructor(self)
1146 # The allocation can succeed on 32-bit builds, e.g. with more
1147 # than 2GB RAM and a 64-bit kernel.
1148 if sys.maxsize > 0x7FFFFFFF:
1149 rawio = self.MockRawIO()
1150 bufio = self.tp(rawio)
1151 self.assertRaises((OverflowError, MemoryError, ValueError),
1152 bufio.__init__, rawio, sys.maxsize)
1153
1154 def test_initialization(self):
1155 rawio = self.MockRawIO([b"abc"])
1156 bufio = self.tp(rawio)
1157 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1158 self.assertRaises(ValueError, bufio.read)
1159 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1160 self.assertRaises(ValueError, bufio.read)
1161 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1162 self.assertRaises(ValueError, bufio.read)
1163
1164 def test_misbehaved_io_read(self):
1165 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1166 bufio = self.tp(rawio)
1167 # _pyio.BufferedReader seems to implement reading different, so that
1168 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170
1171 def test_garbage_collection(self):
1172 # C BufferedReader objects are collected.
1173 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001174 with support.check_warnings(('', ResourceWarning)):
1175 rawio = self.FileIO(support.TESTFN, "w+b")
1176 f = self.tp(rawio)
1177 f.f = f
1178 wr = weakref.ref(f)
1179 del f
1180 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001181 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182
R David Murray67bfe802013-02-23 21:51:05 -05001183 def test_args_error(self):
1184 # Issue #17275
1185 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1186 self.tp(io.BytesIO(), 1024, 1024, 1024)
1187
1188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189class PyBufferedReaderTest(BufferedReaderTest):
1190 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001191
Guido van Rossuma9e20242007-03-08 00:43:48 +00001192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1194 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 def test_constructor(self):
1197 rawio = self.MockRawIO()
1198 bufio = self.tp(rawio)
1199 bufio.__init__(rawio)
1200 bufio.__init__(rawio, buffer_size=1024)
1201 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001202 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 bufio.flush()
1204 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1205 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1206 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1207 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001208 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001212 def test_uninitialized(self):
1213 bufio = self.tp.__new__(self.tp)
1214 del bufio
1215 bufio = self.tp.__new__(self.tp)
1216 self.assertRaisesRegex((ValueError, AttributeError),
1217 'uninitialized|has no attribute',
1218 bufio.write, b'')
1219 bufio.__init__(self.MockRawIO())
1220 self.assertEqual(bufio.write(b''), 0)
1221
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001222 def test_detach_flush(self):
1223 raw = self.MockRawIO()
1224 buf = self.tp(raw)
1225 buf.write(b"howdy!")
1226 self.assertFalse(raw._write_stack)
1227 buf.detach()
1228 self.assertEqual(raw._write_stack, [b"howdy!"])
1229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001231 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 writer = self.MockRawIO()
1233 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001234 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001235 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_write_overflow(self):
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 contents = b"abcdefghijklmnop"
1241 for n in range(0, len(contents), 3):
1242 bufio.write(contents[n:n+3])
1243 flushed = b"".join(writer._write_stack)
1244 # At least (total - 8) bytes were implicitly flushed, perhaps more
1245 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001246 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 def check_writes(self, intermediate_func):
1249 # Lots of writes, test the flushed output is as expected.
1250 contents = bytes(range(256)) * 1000
1251 n = 0
1252 writer = self.MockRawIO()
1253 bufio = self.tp(writer, 13)
1254 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1255 def gen_sizes():
1256 for size in count(1):
1257 for i in range(15):
1258 yield size
1259 sizes = gen_sizes()
1260 while n < len(contents):
1261 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 intermediate_func(bufio)
1264 n += size
1265 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_writes(self):
1269 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_writes_and_flushes(self):
1272 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_writes_and_seeks(self):
1275 def _seekabs(bufio):
1276 pos = bufio.tell()
1277 bufio.seek(pos + 1, 0)
1278 bufio.seek(pos - 1, 0)
1279 bufio.seek(pos, 0)
1280 self.check_writes(_seekabs)
1281 def _seekrel(bufio):
1282 pos = bufio.seek(0, 1)
1283 bufio.seek(+1, 1)
1284 bufio.seek(-1, 1)
1285 bufio.seek(pos, 0)
1286 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_writes_and_truncates(self):
1289 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001291 def test_write_non_blocking(self):
1292 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001293 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001294
Ezio Melottib3aedd42010-11-20 19:04:17 +00001295 self.assertEqual(bufio.write(b"abcd"), 4)
1296 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 # 1 byte will be written, the rest will be buffered
1298 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001301 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1302 raw.block_on(b"0")
1303 try:
1304 bufio.write(b"opqrwxyz0123456789")
1305 except self.BlockingIOError as e:
1306 written = e.characters_written
1307 else:
1308 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(written, 16)
1310 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001312
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314 s = raw.pop_written()
1315 # Previously buffered bytes were flushed
1316 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_write_and_rewind(self):
1319 raw = io.BytesIO()
1320 bufio = self.tp(raw, 4)
1321 self.assertEqual(bufio.write(b"abcdef"), 6)
1322 self.assertEqual(bufio.tell(), 6)
1323 bufio.seek(0, 0)
1324 self.assertEqual(bufio.write(b"XY"), 2)
1325 bufio.seek(6, 0)
1326 self.assertEqual(raw.getvalue(), b"XYcdef")
1327 self.assertEqual(bufio.write(b"123456"), 6)
1328 bufio.flush()
1329 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 def test_flush(self):
1332 writer = self.MockRawIO()
1333 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001334 bufio.write(b"abc")
1335 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001336 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001337
Antoine Pitrou131a4892012-10-16 22:57:11 +02001338 def test_writelines(self):
1339 l = [b'ab', b'cd', b'ef']
1340 writer = self.MockRawIO()
1341 bufio = self.tp(writer, 8)
1342 bufio.writelines(l)
1343 bufio.flush()
1344 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1345
1346 def test_writelines_userlist(self):
1347 l = UserList([b'ab', b'cd', b'ef'])
1348 writer = self.MockRawIO()
1349 bufio = self.tp(writer, 8)
1350 bufio.writelines(l)
1351 bufio.flush()
1352 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1353
1354 def test_writelines_error(self):
1355 writer = self.MockRawIO()
1356 bufio = self.tp(writer, 8)
1357 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1358 self.assertRaises(TypeError, bufio.writelines, None)
1359 self.assertRaises(TypeError, bufio.writelines, 'abc')
1360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 def test_destructor(self):
1362 writer = self.MockRawIO()
1363 bufio = self.tp(writer, 8)
1364 bufio.write(b"abc")
1365 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001366 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
1369 def test_truncate(self):
1370 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001371 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 bufio = self.tp(raw, 8)
1373 bufio.write(b"abcdef")
1374 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001375 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001376 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377 self.assertEqual(f.read(), b"abc")
1378
Victor Stinner45df8202010-04-28 22:31:17 +00001379 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001380 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001381 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001382 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 # Write out many bytes from many threads and test they were
1384 # all flushed.
1385 N = 1000
1386 contents = bytes(range(256)) * N
1387 sizes = cycle([1, 19])
1388 n = 0
1389 queue = deque()
1390 while n < len(contents):
1391 size = next(sizes)
1392 queue.append(contents[n:n+size])
1393 n += size
1394 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001395 # We use a real file object because it allows us to
1396 # exercise situations where the GIL is released before
1397 # writing the buffer to the raw streams. This is in addition
1398 # to concurrency issues due to switching threads in the middle
1399 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001400 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 errors = []
1403 def f():
1404 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 while True:
1406 try:
1407 s = queue.popleft()
1408 except IndexError:
1409 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001410 bufio.write(s)
1411 except Exception as e:
1412 errors.append(e)
1413 raise
1414 threads = [threading.Thread(target=f) for x in range(20)]
1415 for t in threads:
1416 t.start()
1417 time.sleep(0.02) # yield
1418 for t in threads:
1419 t.join()
1420 self.assertFalse(errors,
1421 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001423 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001424 s = f.read()
1425 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001427 finally:
1428 support.unlink(support.TESTFN)
1429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 def test_misbehaved_io(self):
1431 rawio = self.MisbehavedRawIO()
1432 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001433 self.assertRaises(OSError, bufio.seek, 0)
1434 self.assertRaises(OSError, bufio.tell)
1435 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436
Florent Xicluna109d5732012-07-07 17:03:22 +02001437 def test_max_buffer_size_removal(self):
1438 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001439 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001440
Benjamin Peterson68623612012-12-20 11:53:11 -06001441 def test_write_error_on_close(self):
1442 raw = self.MockRawIO()
1443 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001444 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001445 raw.write = bad_write
1446 b = self.tp(raw)
1447 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001448 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001449 self.assertTrue(b.closed)
1450
Benjamin Peterson59406a92009-03-26 17:10:29 +00001451
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001452class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453 tp = io.BufferedWriter
1454
1455 def test_constructor(self):
1456 BufferedWriterTest.test_constructor(self)
1457 # The allocation can succeed on 32-bit builds, e.g. with more
1458 # than 2GB RAM and a 64-bit kernel.
1459 if sys.maxsize > 0x7FFFFFFF:
1460 rawio = self.MockRawIO()
1461 bufio = self.tp(rawio)
1462 self.assertRaises((OverflowError, MemoryError, ValueError),
1463 bufio.__init__, rawio, sys.maxsize)
1464
1465 def test_initialization(self):
1466 rawio = self.MockRawIO()
1467 bufio = self.tp(rawio)
1468 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1469 self.assertRaises(ValueError, bufio.write, b"def")
1470 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1471 self.assertRaises(ValueError, bufio.write, b"def")
1472 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1473 self.assertRaises(ValueError, bufio.write, b"def")
1474
1475 def test_garbage_collection(self):
1476 # C BufferedWriter objects are collected, and collecting them flushes
1477 # all data to disk.
1478 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001479 with support.check_warnings(('', ResourceWarning)):
1480 rawio = self.FileIO(support.TESTFN, "w+b")
1481 f = self.tp(rawio)
1482 f.write(b"123xxx")
1483 f.x = f
1484 wr = weakref.ref(f)
1485 del f
1486 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001487 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001488 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 self.assertEqual(f.read(), b"123xxx")
1490
R David Murray67bfe802013-02-23 21:51:05 -05001491 def test_args_error(self):
1492 # Issue #17275
1493 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1494 self.tp(io.BytesIO(), 1024, 1024, 1024)
1495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001496
1497class PyBufferedWriterTest(BufferedWriterTest):
1498 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001499
Guido van Rossum01a27522007-03-07 01:00:12 +00001500class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001501
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001502 def test_constructor(self):
1503 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001504 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001506 def test_uninitialized(self):
1507 pair = self.tp.__new__(self.tp)
1508 del pair
1509 pair = self.tp.__new__(self.tp)
1510 self.assertRaisesRegex((ValueError, AttributeError),
1511 'uninitialized|has no attribute',
1512 pair.read, 0)
1513 self.assertRaisesRegex((ValueError, AttributeError),
1514 'uninitialized|has no attribute',
1515 pair.write, b'')
1516 pair.__init__(self.MockRawIO(), self.MockRawIO())
1517 self.assertEqual(pair.read(0), b'')
1518 self.assertEqual(pair.write(b''), 0)
1519
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001520 def test_detach(self):
1521 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1522 self.assertRaises(self.UnsupportedOperation, pair.detach)
1523
Florent Xicluna109d5732012-07-07 17:03:22 +02001524 def test_constructor_max_buffer_size_removal(self):
1525 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001526 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001527
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001528 def test_constructor_with_not_readable(self):
1529 class NotReadable(MockRawIO):
1530 def readable(self):
1531 return False
1532
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001533 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001534
1535 def test_constructor_with_not_writeable(self):
1536 class NotWriteable(MockRawIO):
1537 def writable(self):
1538 return False
1539
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001540 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001541
1542 def test_read(self):
1543 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1544
1545 self.assertEqual(pair.read(3), b"abc")
1546 self.assertEqual(pair.read(1), b"d")
1547 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001548 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1549 self.assertEqual(pair.read(None), b"abc")
1550
1551 def test_readlines(self):
1552 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1553 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1554 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1555 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001556
1557 def test_read1(self):
1558 # .read1() is delegated to the underlying reader object, so this test
1559 # can be shallow.
1560 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1561
1562 self.assertEqual(pair.read1(3), b"abc")
1563
1564 def test_readinto(self):
1565 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1566
1567 data = bytearray(5)
1568 self.assertEqual(pair.readinto(data), 5)
1569 self.assertEqual(data, b"abcde")
1570
1571 def test_write(self):
1572 w = self.MockRawIO()
1573 pair = self.tp(self.MockRawIO(), w)
1574
1575 pair.write(b"abc")
1576 pair.flush()
1577 pair.write(b"def")
1578 pair.flush()
1579 self.assertEqual(w._write_stack, [b"abc", b"def"])
1580
1581 def test_peek(self):
1582 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1583
1584 self.assertTrue(pair.peek(3).startswith(b"abc"))
1585 self.assertEqual(pair.read(3), b"abc")
1586
1587 def test_readable(self):
1588 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1589 self.assertTrue(pair.readable())
1590
1591 def test_writeable(self):
1592 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1593 self.assertTrue(pair.writable())
1594
1595 def test_seekable(self):
1596 # BufferedRWPairs are never seekable, even if their readers and writers
1597 # are.
1598 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1599 self.assertFalse(pair.seekable())
1600
1601 # .flush() is delegated to the underlying writer object and has been
1602 # tested in the test_write method.
1603
1604 def test_close_and_closed(self):
1605 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1606 self.assertFalse(pair.closed)
1607 pair.close()
1608 self.assertTrue(pair.closed)
1609
1610 def test_isatty(self):
1611 class SelectableIsAtty(MockRawIO):
1612 def __init__(self, isatty):
1613 MockRawIO.__init__(self)
1614 self._isatty = isatty
1615
1616 def isatty(self):
1617 return self._isatty
1618
1619 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1620 self.assertFalse(pair.isatty())
1621
1622 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1623 self.assertTrue(pair.isatty())
1624
1625 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1626 self.assertTrue(pair.isatty())
1627
1628 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1629 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631class CBufferedRWPairTest(BufferedRWPairTest):
1632 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634class PyBufferedRWPairTest(BufferedRWPairTest):
1635 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637
1638class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1639 read_mode = "rb+"
1640 write_mode = "wb+"
1641
1642 def test_constructor(self):
1643 BufferedReaderTest.test_constructor(self)
1644 BufferedWriterTest.test_constructor(self)
1645
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001646 def test_uninitialized(self):
1647 BufferedReaderTest.test_uninitialized(self)
1648 BufferedWriterTest.test_uninitialized(self)
1649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 def test_read_and_write(self):
1651 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001652 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001653
1654 self.assertEqual(b"as", rw.read(2))
1655 rw.write(b"ddd")
1656 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001657 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001659 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 def test_seek_and_tell(self):
1662 raw = self.BytesIO(b"asdfghjkl")
1663 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001664
Ezio Melottib3aedd42010-11-20 19:04:17 +00001665 self.assertEqual(b"as", rw.read(2))
1666 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001667 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001668 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001669
Antoine Pitroue05565e2011-08-20 14:39:23 +02001670 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001671 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001672 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001673 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001674 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001675 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001676 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001677 self.assertEqual(7, rw.tell())
1678 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001679 rw.flush()
1680 self.assertEqual(b"asdf123fl", raw.getvalue())
1681
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001682 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001684 def check_flush_and_read(self, read_func):
1685 raw = self.BytesIO(b"abcdefghi")
1686 bufio = self.tp(raw)
1687
Ezio Melottib3aedd42010-11-20 19:04:17 +00001688 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001690 self.assertEqual(b"ef", read_func(bufio, 2))
1691 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001693 self.assertEqual(6, bufio.tell())
1694 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 raw.seek(0, 0)
1696 raw.write(b"XYZ")
1697 # flush() resets the read buffer
1698 bufio.flush()
1699 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001700 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701
1702 def test_flush_and_read(self):
1703 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1704
1705 def test_flush_and_readinto(self):
1706 def _readinto(bufio, n=-1):
1707 b = bytearray(n if n >= 0 else 9999)
1708 n = bufio.readinto(b)
1709 return bytes(b[:n])
1710 self.check_flush_and_read(_readinto)
1711
1712 def test_flush_and_peek(self):
1713 def _peek(bufio, n=-1):
1714 # This relies on the fact that the buffer can contain the whole
1715 # raw stream, otherwise peek() can return less.
1716 b = bufio.peek(n)
1717 if n != -1:
1718 b = b[:n]
1719 bufio.seek(len(b), 1)
1720 return b
1721 self.check_flush_and_read(_peek)
1722
1723 def test_flush_and_write(self):
1724 raw = self.BytesIO(b"abcdefghi")
1725 bufio = self.tp(raw)
1726
1727 bufio.write(b"123")
1728 bufio.flush()
1729 bufio.write(b"45")
1730 bufio.flush()
1731 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001732 self.assertEqual(b"12345fghi", raw.getvalue())
1733 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734
1735 def test_threads(self):
1736 BufferedReaderTest.test_threads(self)
1737 BufferedWriterTest.test_threads(self)
1738
1739 def test_writes_and_peek(self):
1740 def _peek(bufio):
1741 bufio.peek(1)
1742 self.check_writes(_peek)
1743 def _peek(bufio):
1744 pos = bufio.tell()
1745 bufio.seek(-1, 1)
1746 bufio.peek(1)
1747 bufio.seek(pos, 0)
1748 self.check_writes(_peek)
1749
1750 def test_writes_and_reads(self):
1751 def _read(bufio):
1752 bufio.seek(-1, 1)
1753 bufio.read(1)
1754 self.check_writes(_read)
1755
1756 def test_writes_and_read1s(self):
1757 def _read1(bufio):
1758 bufio.seek(-1, 1)
1759 bufio.read1(1)
1760 self.check_writes(_read1)
1761
1762 def test_writes_and_readintos(self):
1763 def _read(bufio):
1764 bufio.seek(-1, 1)
1765 bufio.readinto(bytearray(1))
1766 self.check_writes(_read)
1767
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001768 def test_write_after_readahead(self):
1769 # Issue #6629: writing after the buffer was filled by readahead should
1770 # first rewind the raw stream.
1771 for overwrite_size in [1, 5]:
1772 raw = self.BytesIO(b"A" * 10)
1773 bufio = self.tp(raw, 4)
1774 # Trigger readahead
1775 self.assertEqual(bufio.read(1), b"A")
1776 self.assertEqual(bufio.tell(), 1)
1777 # Overwriting should rewind the raw stream if it needs so
1778 bufio.write(b"B" * overwrite_size)
1779 self.assertEqual(bufio.tell(), overwrite_size + 1)
1780 # If the write size was smaller than the buffer size, flush() and
1781 # check that rewind happens.
1782 bufio.flush()
1783 self.assertEqual(bufio.tell(), overwrite_size + 1)
1784 s = raw.getvalue()
1785 self.assertEqual(s,
1786 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1787
Antoine Pitrou7c404892011-05-13 00:13:33 +02001788 def test_write_rewind_write(self):
1789 # Various combinations of reading / writing / seeking backwards / writing again
1790 def mutate(bufio, pos1, pos2):
1791 assert pos2 >= pos1
1792 # Fill the buffer
1793 bufio.seek(pos1)
1794 bufio.read(pos2 - pos1)
1795 bufio.write(b'\x02')
1796 # This writes earlier than the previous write, but still inside
1797 # the buffer.
1798 bufio.seek(pos1)
1799 bufio.write(b'\x01')
1800
1801 b = b"\x80\x81\x82\x83\x84"
1802 for i in range(0, len(b)):
1803 for j in range(i, len(b)):
1804 raw = self.BytesIO(b)
1805 bufio = self.tp(raw, 100)
1806 mutate(bufio, i, j)
1807 bufio.flush()
1808 expected = bytearray(b)
1809 expected[j] = 2
1810 expected[i] = 1
1811 self.assertEqual(raw.getvalue(), expected,
1812 "failed result for i=%d, j=%d" % (i, j))
1813
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001814 def test_truncate_after_read_or_write(self):
1815 raw = self.BytesIO(b"A" * 10)
1816 bufio = self.tp(raw, 100)
1817 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1818 self.assertEqual(bufio.truncate(), 2)
1819 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1820 self.assertEqual(bufio.truncate(), 4)
1821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 def test_misbehaved_io(self):
1823 BufferedReaderTest.test_misbehaved_io(self)
1824 BufferedWriterTest.test_misbehaved_io(self)
1825
Antoine Pitroue05565e2011-08-20 14:39:23 +02001826 def test_interleaved_read_write(self):
1827 # Test for issue #12213
1828 with self.BytesIO(b'abcdefgh') as raw:
1829 with self.tp(raw, 100) as f:
1830 f.write(b"1")
1831 self.assertEqual(f.read(1), b'b')
1832 f.write(b'2')
1833 self.assertEqual(f.read1(1), b'd')
1834 f.write(b'3')
1835 buf = bytearray(1)
1836 f.readinto(buf)
1837 self.assertEqual(buf, b'f')
1838 f.write(b'4')
1839 self.assertEqual(f.peek(1), b'h')
1840 f.flush()
1841 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1842
1843 with self.BytesIO(b'abc') as raw:
1844 with self.tp(raw, 100) as f:
1845 self.assertEqual(f.read(1), b'a')
1846 f.write(b"2")
1847 self.assertEqual(f.read(1), b'c')
1848 f.flush()
1849 self.assertEqual(raw.getvalue(), b'a2c')
1850
1851 def test_interleaved_readline_write(self):
1852 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1853 with self.tp(raw) as f:
1854 f.write(b'1')
1855 self.assertEqual(f.readline(), b'b\n')
1856 f.write(b'2')
1857 self.assertEqual(f.readline(), b'def\n')
1858 f.write(b'3')
1859 self.assertEqual(f.readline(), b'\n')
1860 f.flush()
1861 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1862
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001863 # You can't construct a BufferedRandom over a non-seekable stream.
1864 test_unseekable = None
1865
R David Murray67bfe802013-02-23 21:51:05 -05001866
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001867class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 tp = io.BufferedRandom
1869
1870 def test_constructor(self):
1871 BufferedRandomTest.test_constructor(self)
1872 # The allocation can succeed on 32-bit builds, e.g. with more
1873 # than 2GB RAM and a 64-bit kernel.
1874 if sys.maxsize > 0x7FFFFFFF:
1875 rawio = self.MockRawIO()
1876 bufio = self.tp(rawio)
1877 self.assertRaises((OverflowError, MemoryError, ValueError),
1878 bufio.__init__, rawio, sys.maxsize)
1879
1880 def test_garbage_collection(self):
1881 CBufferedReaderTest.test_garbage_collection(self)
1882 CBufferedWriterTest.test_garbage_collection(self)
1883
R David Murray67bfe802013-02-23 21:51:05 -05001884 def test_args_error(self):
1885 # Issue #17275
1886 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1887 self.tp(io.BytesIO(), 1024, 1024, 1024)
1888
1889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890class PyBufferedRandomTest(BufferedRandomTest):
1891 tp = pyio.BufferedRandom
1892
1893
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001894# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1895# properties:
1896# - A single output character can correspond to many bytes of input.
1897# - The number of input bytes to complete the character can be
1898# undetermined until the last input byte is received.
1899# - The number of input bytes can vary depending on previous input.
1900# - A single input byte can correspond to many characters of output.
1901# - The number of output characters can be undetermined until the
1902# last input byte is received.
1903# - The number of output characters can vary depending on previous input.
1904
1905class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1906 """
1907 For testing seek/tell behavior with a stateful, buffering decoder.
1908
1909 Input is a sequence of words. Words may be fixed-length (length set
1910 by input) or variable-length (period-terminated). In variable-length
1911 mode, extra periods are ignored. Possible words are:
1912 - 'i' followed by a number sets the input length, I (maximum 99).
1913 When I is set to 0, words are space-terminated.
1914 - 'o' followed by a number sets the output length, O (maximum 99).
1915 - Any other word is converted into a word followed by a period on
1916 the output. The output word consists of the input word truncated
1917 or padded out with hyphens to make its length equal to O. If O
1918 is 0, the word is output verbatim without truncating or padding.
1919 I and O are initially set to 1. When I changes, any buffered input is
1920 re-scanned according to the new I. EOF also terminates the last word.
1921 """
1922
1923 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001924 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001925 self.reset()
1926
1927 def __repr__(self):
1928 return '<SID %x>' % id(self)
1929
1930 def reset(self):
1931 self.i = 1
1932 self.o = 1
1933 self.buffer = bytearray()
1934
1935 def getstate(self):
1936 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1937 return bytes(self.buffer), i*100 + o
1938
1939 def setstate(self, state):
1940 buffer, io = state
1941 self.buffer = bytearray(buffer)
1942 i, o = divmod(io, 100)
1943 self.i, self.o = i ^ 1, o ^ 1
1944
1945 def decode(self, input, final=False):
1946 output = ''
1947 for b in input:
1948 if self.i == 0: # variable-length, terminated with period
1949 if b == ord('.'):
1950 if self.buffer:
1951 output += self.process_word()
1952 else:
1953 self.buffer.append(b)
1954 else: # fixed-length, terminate after self.i bytes
1955 self.buffer.append(b)
1956 if len(self.buffer) == self.i:
1957 output += self.process_word()
1958 if final and self.buffer: # EOF terminates the last word
1959 output += self.process_word()
1960 return output
1961
1962 def process_word(self):
1963 output = ''
1964 if self.buffer[0] == ord('i'):
1965 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1966 elif self.buffer[0] == ord('o'):
1967 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1968 else:
1969 output = self.buffer.decode('ascii')
1970 if len(output) < self.o:
1971 output += '-'*self.o # pad out with hyphens
1972 if self.o:
1973 output = output[:self.o] # truncate to output length
1974 output += '.'
1975 self.buffer = bytearray()
1976 return output
1977
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001978 codecEnabled = False
1979
1980 @classmethod
1981 def lookupTestDecoder(cls, name):
1982 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001983 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001984 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001985 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001986 incrementalencoder=None,
1987 streamreader=None, streamwriter=None,
1988 incrementaldecoder=cls)
1989
1990# Register the previous decoder for testing.
1991# Disabled by default, tests will enable it.
1992codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1993
1994
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001995class StatefulIncrementalDecoderTest(unittest.TestCase):
1996 """
1997 Make sure the StatefulIncrementalDecoder actually works.
1998 """
1999
2000 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002001 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002002 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002003 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002004 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002005 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002006 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002007 # I=0, O=6 (variable-length input, fixed-length output)
2008 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2009 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002011 # I=6, O=3 (fixed-length input > fixed-length output)
2012 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2013 # I=0, then 3; O=29, then 15 (with longer output)
2014 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2015 'a----------------------------.' +
2016 'b----------------------------.' +
2017 'cde--------------------------.' +
2018 'abcdefghijabcde.' +
2019 'a.b------------.' +
2020 '.c.------------.' +
2021 'd.e------------.' +
2022 'k--------------.' +
2023 'l--------------.' +
2024 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002025 ]
2026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002028 # Try a few one-shot test cases.
2029 for input, eof, output in self.test_cases:
2030 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032
2033 # Also test an unfinished decode, followed by forcing EOF.
2034 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002035 self.assertEqual(d.decode(b'oiabcd'), '')
2036 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002037
2038class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002039
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002040 def setUp(self):
2041 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2042 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002043 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044
Guido van Rossumd0712812007-04-11 16:32:43 +00002045 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002046 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002048 def test_constructor(self):
2049 r = self.BytesIO(b"\xc3\xa9\n\n")
2050 b = self.BufferedReader(r, 1000)
2051 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002052 t.__init__(b, encoding="latin-1", newline="\r\n")
2053 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002054 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002055 t.__init__(b, encoding="utf-8", line_buffering=True)
2056 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(t.line_buffering, True)
2058 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 self.assertRaises(TypeError, t.__init__, b, newline=42)
2060 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2061
Nick Coghlana9b15242014-02-04 22:11:18 +10002062 def test_non_text_encoding_codecs_are_rejected(self):
2063 # Ensure the constructor complains if passed a codec that isn't
2064 # marked as a text encoding
2065 # http://bugs.python.org/issue20404
2066 r = self.BytesIO()
2067 b = self.BufferedWriter(r)
2068 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2069 self.TextIOWrapper(b, encoding="hex")
2070
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002071 def test_detach(self):
2072 r = self.BytesIO()
2073 b = self.BufferedWriter(r)
2074 t = self.TextIOWrapper(b)
2075 self.assertIs(t.detach(), b)
2076
2077 t = self.TextIOWrapper(b, encoding="ascii")
2078 t.write("howdy")
2079 self.assertFalse(r.getvalue())
2080 t.detach()
2081 self.assertEqual(r.getvalue(), b"howdy")
2082 self.assertRaises(ValueError, t.detach)
2083
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002084 def test_repr(self):
2085 raw = self.BytesIO("hello".encode("utf-8"))
2086 b = self.BufferedReader(raw)
2087 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002088 modname = self.TextIOWrapper.__module__
2089 self.assertEqual(repr(t),
2090 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2091 raw.name = "dummy"
2092 self.assertEqual(repr(t),
2093 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002094 t.mode = "r"
2095 self.assertEqual(repr(t),
2096 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002097 raw.name = b"dummy"
2098 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002099 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_line_buffering(self):
2102 r = self.BytesIO()
2103 b = self.BufferedWriter(r, 1000)
2104 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002105 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002107 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002109 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002111
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002112 def test_default_encoding(self):
2113 old_environ = dict(os.environ)
2114 try:
2115 # try to get a user preferred encoding different than the current
2116 # locale encoding to check that TextIOWrapper() uses the current
2117 # locale encoding and not the user preferred encoding
2118 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2119 if key in os.environ:
2120 del os.environ[key]
2121
2122 current_locale_encoding = locale.getpreferredencoding(False)
2123 b = self.BytesIO()
2124 t = self.TextIOWrapper(b)
2125 self.assertEqual(t.encoding, current_locale_encoding)
2126 finally:
2127 os.environ.clear()
2128 os.environ.update(old_environ)
2129
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002130 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002131 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002132 # Issue 15989
2133 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002134 b = self.BytesIO()
2135 b.fileno = lambda: _testcapi.INT_MAX + 1
2136 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2137 b.fileno = lambda: _testcapi.UINT_MAX + 1
2138 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 def test_encoding(self):
2141 # Check the encoding attribute is always set, and valid
2142 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002143 t = self.TextIOWrapper(b, encoding="utf-8")
2144 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002145 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002146 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 codecs.lookup(t.encoding)
2148
2149 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002150 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 b = self.BytesIO(b"abc\n\xff\n")
2152 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002153 self.assertRaises(UnicodeError, t.read)
2154 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 b = self.BytesIO(b"abc\n\xff\n")
2156 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002157 self.assertRaises(UnicodeError, t.read)
2158 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 b = self.BytesIO(b"abc\n\xff\n")
2160 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002162 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 b = self.BytesIO(b"abc\n\xff\n")
2164 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002168 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 b = self.BytesIO()
2170 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002171 self.assertRaises(UnicodeError, t.write, "\xff")
2172 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 b = self.BytesIO()
2174 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002175 self.assertRaises(UnicodeError, t.write, "\xff")
2176 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 b = self.BytesIO()
2178 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002179 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002180 t.write("abc\xffdef\n")
2181 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002183 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 b = self.BytesIO()
2185 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002186 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002187 t.write("abc\xffdef\n")
2188 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002189 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002192 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2193
2194 tests = [
2195 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002196 [ '', input_lines ],
2197 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2198 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2199 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002200 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002201 encodings = (
2202 'utf-8', 'latin-1',
2203 'utf-16', 'utf-16-le', 'utf-16-be',
2204 'utf-32', 'utf-32-le', 'utf-32-be',
2205 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002206
Guido van Rossum8358db22007-08-18 21:39:55 +00002207 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002208 # character in TextIOWrapper._pending_line.
2209 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002210 # XXX: str.encode() should return bytes
2211 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002212 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002213 for bufsize in range(1, 10):
2214 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2216 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002217 encoding=encoding)
2218 if do_reads:
2219 got_lines = []
2220 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002221 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002222 if c2 == '':
2223 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002225 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002226 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002227 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002228
2229 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(got_line, exp_line)
2231 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 def test_newlines_input(self):
2234 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002235 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2236 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002237 (None, normalized.decode("ascii").splitlines(keepends=True)),
2238 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2240 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2241 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002242 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 buf = self.BytesIO(testdata)
2244 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002246 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002249 def test_newlines_output(self):
2250 testdict = {
2251 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2252 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2253 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2254 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2255 }
2256 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2257 for newline, expected in tests:
2258 buf = self.BytesIO()
2259 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2260 txt.write("AAA\nB")
2261 txt.write("BB\nCCC\n")
2262 txt.write("X\rY\r\nZ")
2263 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(buf.closed, False)
2265 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266
2267 def test_destructor(self):
2268 l = []
2269 base = self.BytesIO
2270 class MyBytesIO(base):
2271 def close(self):
2272 l.append(self.getvalue())
2273 base.close(self)
2274 b = MyBytesIO()
2275 t = self.TextIOWrapper(b, encoding="ascii")
2276 t.write("abc")
2277 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002278 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002279 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280
2281 def test_override_destructor(self):
2282 record = []
2283 class MyTextIO(self.TextIOWrapper):
2284 def __del__(self):
2285 record.append(1)
2286 try:
2287 f = super().__del__
2288 except AttributeError:
2289 pass
2290 else:
2291 f()
2292 def close(self):
2293 record.append(2)
2294 super().close()
2295 def flush(self):
2296 record.append(3)
2297 super().flush()
2298 b = self.BytesIO()
2299 t = MyTextIO(b, encoding="ascii")
2300 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002301 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002302 self.assertEqual(record, [1, 2, 3])
2303
2304 def test_error_through_destructor(self):
2305 # Test that the exception state is not modified by a destructor,
2306 # even if close() fails.
2307 rawio = self.CloseFailureIO()
2308 def f():
2309 self.TextIOWrapper(rawio).xyzzy
2310 with support.captured_output("stderr") as s:
2311 self.assertRaises(AttributeError, f)
2312 s = s.getvalue().strip()
2313 if s:
2314 # The destructor *may* have printed an unraisable error, check it
2315 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002316 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002317 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002318
Guido van Rossum9b76da62007-04-11 01:09:03 +00002319 # Systematic tests of the text I/O API
2320
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002322 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002323 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002325 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002327 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002329 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(f.tell(), 0)
2331 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002332 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002333 self.assertEqual(f.seek(0), 0)
2334 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002335 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(f.read(2), "ab")
2337 self.assertEqual(f.read(1), "c")
2338 self.assertEqual(f.read(1), "")
2339 self.assertEqual(f.read(), "")
2340 self.assertEqual(f.tell(), cookie)
2341 self.assertEqual(f.seek(0), 0)
2342 self.assertEqual(f.seek(0, 2), cookie)
2343 self.assertEqual(f.write("def"), 3)
2344 self.assertEqual(f.seek(cookie), cookie)
2345 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002346 if enc.startswith("utf"):
2347 self.multi_line_test(f, enc)
2348 f.close()
2349
2350 def multi_line_test(self, f, enc):
2351 f.seek(0)
2352 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002353 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002354 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002355 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002356 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002357 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002358 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002359 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002360 wlines.append((f.tell(), line))
2361 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002362 f.seek(0)
2363 rlines = []
2364 while True:
2365 pos = f.tell()
2366 line = f.readline()
2367 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002368 break
2369 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002370 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002373 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002374 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002375 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002376 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002377 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002378 p2 = f.tell()
2379 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002380 self.assertEqual(f.tell(), p0)
2381 self.assertEqual(f.readline(), "\xff\n")
2382 self.assertEqual(f.tell(), p1)
2383 self.assertEqual(f.readline(), "\xff\n")
2384 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002385 f.seek(0)
2386 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002387 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002388 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002390 f.close()
2391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 def test_seeking(self):
2393 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002394 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002395 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002396 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002398 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002399 suffix = bytes(u_suffix.encode("utf-8"))
2400 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002401 with self.open(support.TESTFN, "wb") as f:
2402 f.write(line*2)
2403 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2404 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002405 self.assertEqual(s, str(prefix, "ascii"))
2406 self.assertEqual(f.tell(), prefix_size)
2407 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002410 # Regression test for a specific bug
2411 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002412 with self.open(support.TESTFN, "wb") as f:
2413 f.write(data)
2414 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2415 f._CHUNK_SIZE # Just test that it exists
2416 f._CHUNK_SIZE = 2
2417 f.readline()
2418 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 def test_seek_and_tell(self):
2421 #Test seek/tell using the StatefulIncrementalDecoder.
2422 # Make test faster by doing smaller seeks
2423 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002424
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002425 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002426 """Tell/seek to various points within a data stream and ensure
2427 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002429 f.write(data)
2430 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002431 f = self.open(support.TESTFN, encoding='test_decoder')
2432 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002433 decoded = f.read()
2434 f.close()
2435
Neal Norwitze2b07052008-03-18 19:52:05 +00002436 for i in range(min_pos, len(decoded) + 1): # seek positions
2437 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002440 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002441 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002442 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002444 f.close()
2445
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002446 # Enable the test decoder.
2447 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002448
2449 # Run the tests.
2450 try:
2451 # Try each test case.
2452 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002453 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002454
2455 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002456 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2457 offset = CHUNK_SIZE - len(input)//2
2458 prefix = b'.'*offset
2459 # Don't bother seeking into the prefix (takes too long).
2460 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002461 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002462
2463 # Ensure our test decoder won't interfere with subsequent tests.
2464 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002465 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002467 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002468 data = "1234567890"
2469 tests = ("utf-16",
2470 "utf-16-le",
2471 "utf-16-be",
2472 "utf-32",
2473 "utf-32-le",
2474 "utf-32-be")
2475 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 buf = self.BytesIO()
2477 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002478 # Check if the BOM is written only once (see issue1753).
2479 f.write(data)
2480 f.write(data)
2481 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002483 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(f.read(), data * 2)
2485 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002486
Benjamin Petersona1b49012009-03-31 23:11:32 +00002487 def test_unreadable(self):
2488 class UnReadable(self.BytesIO):
2489 def readable(self):
2490 return False
2491 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002492 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002494 def test_read_one_by_one(self):
2495 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002496 reads = ""
2497 while True:
2498 c = txt.read(1)
2499 if not c:
2500 break
2501 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002503
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002504 def test_readlines(self):
2505 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2506 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2507 txt.seek(0)
2508 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2509 txt.seek(0)
2510 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2511
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002512 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002514 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002516 reads = ""
2517 while True:
2518 c = txt.read(128)
2519 if not c:
2520 break
2521 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002523
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002524 def test_writelines(self):
2525 l = ['ab', 'cd', 'ef']
2526 buf = self.BytesIO()
2527 txt = self.TextIOWrapper(buf)
2528 txt.writelines(l)
2529 txt.flush()
2530 self.assertEqual(buf.getvalue(), b'abcdef')
2531
2532 def test_writelines_userlist(self):
2533 l = UserList(['ab', 'cd', 'ef'])
2534 buf = self.BytesIO()
2535 txt = self.TextIOWrapper(buf)
2536 txt.writelines(l)
2537 txt.flush()
2538 self.assertEqual(buf.getvalue(), b'abcdef')
2539
2540 def test_writelines_error(self):
2541 txt = self.TextIOWrapper(self.BytesIO())
2542 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2543 self.assertRaises(TypeError, txt.writelines, None)
2544 self.assertRaises(TypeError, txt.writelines, b'abc')
2545
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002546 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002547 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002548
2549 # read one char at a time
2550 reads = ""
2551 while True:
2552 c = txt.read(1)
2553 if not c:
2554 break
2555 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002556 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002557
2558 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002560 txt._CHUNK_SIZE = 4
2561
2562 reads = ""
2563 while True:
2564 c = txt.read(4)
2565 if not c:
2566 break
2567 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002568 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002569
2570 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002572 txt._CHUNK_SIZE = 4
2573
2574 reads = txt.read(4)
2575 reads += txt.read(4)
2576 reads += txt.readline()
2577 reads += txt.readline()
2578 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002579 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002580
2581 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002582 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002583 txt._CHUNK_SIZE = 4
2584
2585 reads = txt.read(4)
2586 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002587 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002588
2589 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002590 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002591 txt._CHUNK_SIZE = 4
2592
2593 reads = txt.read(4)
2594 pos = txt.tell()
2595 txt.seek(0)
2596 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002598
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002599 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 buffer = self.BytesIO(self.testdata)
2601 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002602
2603 self.assertEqual(buffer.seekable(), txt.seekable())
2604
Antoine Pitroue4501852009-05-14 18:55:55 +00002605 def test_append_bom(self):
2606 # The BOM is not written again when appending to a non-empty file
2607 filename = support.TESTFN
2608 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2609 with self.open(filename, 'w', encoding=charset) as f:
2610 f.write('aaa')
2611 pos = f.tell()
2612 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002613 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002614
2615 with self.open(filename, 'a', encoding=charset) as f:
2616 f.write('xxx')
2617 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002619
2620 def test_seek_bom(self):
2621 # Same test, but when seeking manually
2622 filename = support.TESTFN
2623 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2624 with self.open(filename, 'w', encoding=charset) as f:
2625 f.write('aaa')
2626 pos = f.tell()
2627 with self.open(filename, 'r+', encoding=charset) as f:
2628 f.seek(pos)
2629 f.write('zzz')
2630 f.seek(0)
2631 f.write('bbb')
2632 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002633 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002634
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002635 def test_errors_property(self):
2636 with self.open(support.TESTFN, "w") as f:
2637 self.assertEqual(f.errors, "strict")
2638 with self.open(support.TESTFN, "w", errors="replace") as f:
2639 self.assertEqual(f.errors, "replace")
2640
Brett Cannon31f59292011-02-21 19:29:56 +00002641 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002642 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002643 def test_threads_write(self):
2644 # Issue6750: concurrent writes could duplicate data
2645 event = threading.Event()
2646 with self.open(support.TESTFN, "w", buffering=1) as f:
2647 def run(n):
2648 text = "Thread%03d\n" % n
2649 event.wait()
2650 f.write(text)
2651 threads = [threading.Thread(target=lambda n=x: run(n))
2652 for x in range(20)]
2653 for t in threads:
2654 t.start()
2655 time.sleep(0.02)
2656 event.set()
2657 for t in threads:
2658 t.join()
2659 with self.open(support.TESTFN) as f:
2660 content = f.read()
2661 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002663
Antoine Pitrou6be88762010-05-03 16:48:20 +00002664 def test_flush_error_on_close(self):
2665 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2666 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002667 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002668 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002669 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002670 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002671
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002672 def test_close_error_on_close(self):
2673 buffer = self.BytesIO(self.testdata)
2674 def bad_flush():
2675 raise OSError('flush')
2676 def bad_close():
2677 raise OSError('close')
2678 buffer.close = bad_close
2679 txt = self.TextIOWrapper(buffer, encoding="ascii")
2680 txt.flush = bad_flush
2681 with self.assertRaises(OSError) as err: # exception not swallowed
2682 txt.close()
2683 self.assertEqual(err.exception.args, ('close',))
2684 self.assertIsInstance(err.exception.__context__, OSError)
2685 self.assertEqual(err.exception.__context__.args, ('flush',))
2686 self.assertFalse(txt.closed)
2687
2688 def test_nonnormalized_close_error_on_close(self):
2689 # Issue #21677
2690 buffer = self.BytesIO(self.testdata)
2691 def bad_flush():
2692 raise non_existing_flush
2693 def bad_close():
2694 raise non_existing_close
2695 buffer.close = bad_close
2696 txt = self.TextIOWrapper(buffer, encoding="ascii")
2697 txt.flush = bad_flush
2698 with self.assertRaises(NameError) as err: # exception not swallowed
2699 txt.close()
2700 self.assertIn('non_existing_close', str(err.exception))
2701 self.assertIsInstance(err.exception.__context__, NameError)
2702 self.assertIn('non_existing_flush', str(err.exception.__context__))
2703 self.assertFalse(txt.closed)
2704
Antoine Pitrou6be88762010-05-03 16:48:20 +00002705 def test_multi_close(self):
2706 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2707 txt.close()
2708 txt.close()
2709 txt.close()
2710 self.assertRaises(ValueError, txt.flush)
2711
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002712 def test_unseekable(self):
2713 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2714 self.assertRaises(self.UnsupportedOperation, txt.tell)
2715 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2716
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002717 def test_readonly_attributes(self):
2718 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2719 buf = self.BytesIO(self.testdata)
2720 with self.assertRaises(AttributeError):
2721 txt.buffer = buf
2722
Antoine Pitroue96ec682011-07-23 21:46:35 +02002723 def test_rawio(self):
2724 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2725 # that subprocess.Popen() can have the required unbuffered
2726 # semantics with universal_newlines=True.
2727 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2728 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2729 # Reads
2730 self.assertEqual(txt.read(4), 'abcd')
2731 self.assertEqual(txt.readline(), 'efghi\n')
2732 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2733
2734 def test_rawio_write_through(self):
2735 # Issue #12591: with write_through=True, writes don't need a flush
2736 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2737 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2738 write_through=True)
2739 txt.write('1')
2740 txt.write('23\n4')
2741 txt.write('5')
2742 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2743
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002744 def test_bufio_write_through(self):
2745 # Issue #21396: write_through=True doesn't force a flush()
2746 # on the underlying binary buffered object.
2747 flush_called, write_called = [], []
2748 class BufferedWriter(self.BufferedWriter):
2749 def flush(self, *args, **kwargs):
2750 flush_called.append(True)
2751 return super().flush(*args, **kwargs)
2752 def write(self, *args, **kwargs):
2753 write_called.append(True)
2754 return super().write(*args, **kwargs)
2755
2756 rawio = self.BytesIO()
2757 data = b"a"
2758 bufio = BufferedWriter(rawio, len(data)*2)
2759 textio = self.TextIOWrapper(bufio, encoding='ascii',
2760 write_through=True)
2761 # write to the buffered io but don't overflow the buffer
2762 text = data.decode('ascii')
2763 textio.write(text)
2764
2765 # buffer.flush is not called with write_through=True
2766 self.assertFalse(flush_called)
2767 # buffer.write *is* called with write_through=True
2768 self.assertTrue(write_called)
2769 self.assertEqual(rawio.getvalue(), b"") # no flush
2770
2771 write_called = [] # reset
2772 textio.write(text * 10) # total content is larger than bufio buffer
2773 self.assertTrue(write_called)
2774 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2775
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002776 def test_read_nonbytes(self):
2777 # Issue #17106
2778 # Crash when underlying read() returns non-bytes
2779 t = self.TextIOWrapper(self.StringIO('a'))
2780 self.assertRaises(TypeError, t.read, 1)
2781 t = self.TextIOWrapper(self.StringIO('a'))
2782 self.assertRaises(TypeError, t.readline)
2783 t = self.TextIOWrapper(self.StringIO('a'))
2784 self.assertRaises(TypeError, t.read)
2785
2786 def test_illegal_decoder(self):
2787 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002788 # Bypass the early encoding check added in issue 20404
2789 def _make_illegal_wrapper():
2790 quopri = codecs.lookup("quopri")
2791 quopri._is_text_encoding = True
2792 try:
2793 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2794 newline='\n', encoding="quopri")
2795 finally:
2796 quopri._is_text_encoding = False
2797 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002798 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002799 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002800 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002801 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002802 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002803 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002804 self.assertRaises(TypeError, t.read)
2805
Antoine Pitrou712cb732013-12-21 15:51:54 +01002806 def _check_create_at_shutdown(self, **kwargs):
2807 # Issue #20037: creating a TextIOWrapper at shutdown
2808 # shouldn't crash the interpreter.
2809 iomod = self.io.__name__
2810 code = """if 1:
2811 import codecs
2812 import {iomod} as io
2813
2814 # Avoid looking up codecs at shutdown
2815 codecs.lookup('utf-8')
2816
2817 class C:
2818 def __init__(self):
2819 self.buf = io.BytesIO()
2820 def __del__(self):
2821 io.TextIOWrapper(self.buf, **{kwargs})
2822 print("ok")
2823 c = C()
2824 """.format(iomod=iomod, kwargs=kwargs)
2825 return assert_python_ok("-c", code)
2826
2827 def test_create_at_shutdown_without_encoding(self):
2828 rc, out, err = self._check_create_at_shutdown()
2829 if err:
2830 # Can error out with a RuntimeError if the module state
2831 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002832 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002833 else:
2834 self.assertEqual("ok", out.decode().strip())
2835
2836 def test_create_at_shutdown_with_encoding(self):
2837 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2838 errors='strict')
2839 self.assertFalse(err)
2840 self.assertEqual("ok", out.decode().strip())
2841
Antoine Pitroub8503892014-04-29 10:14:02 +02002842 def test_read_byteslike(self):
2843 r = MemviewBytesIO(b'Just some random string\n')
2844 t = self.TextIOWrapper(r, 'utf-8')
2845
2846 # TextIOwrapper will not read the full string, because
2847 # we truncate it to a multiple of the native int size
2848 # so that we can construct a more complex memoryview.
2849 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2850
2851 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2852
2853class MemviewBytesIO(io.BytesIO):
2854 '''A BytesIO object whose read method returns memoryviews
2855 rather than bytes'''
2856
2857 def read1(self, len_):
2858 return _to_memoryview(super().read1(len_))
2859
2860 def read(self, len_):
2861 return _to_memoryview(super().read(len_))
2862
2863def _to_memoryview(buf):
2864 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2865
2866 arr = array.array('i')
2867 idx = len(buf) - len(buf) % arr.itemsize
2868 arr.frombytes(buf[:idx])
2869 return memoryview(arr)
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002871class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002872 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002873 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002874
2875 def test_initialization(self):
2876 r = self.BytesIO(b"\xc3\xa9\n\n")
2877 b = self.BufferedReader(r, 1000)
2878 t = self.TextIOWrapper(b)
2879 self.assertRaises(TypeError, t.__init__, b, newline=42)
2880 self.assertRaises(ValueError, t.read)
2881 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2882 self.assertRaises(ValueError, t.read)
2883
2884 def test_garbage_collection(self):
2885 # C TextIOWrapper objects are collected, and collecting them flushes
2886 # all data to disk.
2887 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002888 with support.check_warnings(('', ResourceWarning)):
2889 rawio = io.FileIO(support.TESTFN, "wb")
2890 b = self.BufferedWriter(rawio)
2891 t = self.TextIOWrapper(b, encoding="ascii")
2892 t.write("456def")
2893 t.x = t
2894 wr = weakref.ref(t)
2895 del t
2896 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002897 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002898 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002899 self.assertEqual(f.read(), b"456def")
2900
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002901 def test_rwpair_cleared_before_textio(self):
2902 # Issue 13070: TextIOWrapper's finalization would crash when called
2903 # after the reference to the underlying BufferedRWPair's writer got
2904 # cleared by the GC.
2905 for i in range(1000):
2906 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2907 t1 = self.TextIOWrapper(b1, encoding="ascii")
2908 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2909 t2 = self.TextIOWrapper(b2, encoding="ascii")
2910 # circular references
2911 t1.buddy = t2
2912 t2.buddy = t1
2913 support.gc_collect()
2914
2915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002917 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002918 #shutdown_error = "LookupError: unknown encoding: ascii"
2919 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002920
2921
2922class IncrementalNewlineDecoderTest(unittest.TestCase):
2923
2924 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002925 # UTF-8 specific tests for a newline decoder
2926 def _check_decode(b, s, **kwargs):
2927 # We exercise getstate() / setstate() as well as decode()
2928 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002929 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002930 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002931 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002932
Antoine Pitrou180a3362008-12-14 16:36:46 +00002933 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002934
Antoine Pitrou180a3362008-12-14 16:36:46 +00002935 _check_decode(b'\xe8', "")
2936 _check_decode(b'\xa2', "")
2937 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002938
Antoine Pitrou180a3362008-12-14 16:36:46 +00002939 _check_decode(b'\xe8', "")
2940 _check_decode(b'\xa2', "")
2941 _check_decode(b'\x88', "\u8888")
2942
2943 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002944 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2945
Antoine Pitrou180a3362008-12-14 16:36:46 +00002946 decoder.reset()
2947 _check_decode(b'\n', "\n")
2948 _check_decode(b'\r', "")
2949 _check_decode(b'', "\n", final=True)
2950 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002951
Antoine Pitrou180a3362008-12-14 16:36:46 +00002952 _check_decode(b'\r', "")
2953 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002954
Antoine Pitrou180a3362008-12-14 16:36:46 +00002955 _check_decode(b'\r\r\n', "\n\n")
2956 _check_decode(b'\r', "")
2957 _check_decode(b'\r', "\n")
2958 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002959
Antoine Pitrou180a3362008-12-14 16:36:46 +00002960 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2961 _check_decode(b'\xe8\xa2\x88', "\u8888")
2962 _check_decode(b'\n', "\n")
2963 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2964 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002966 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002967 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002968 if encoding is not None:
2969 encoder = codecs.getincrementalencoder(encoding)()
2970 def _decode_bytewise(s):
2971 # Decode one byte at a time
2972 for b in encoder.encode(s):
2973 result.append(decoder.decode(bytes([b])))
2974 else:
2975 encoder = None
2976 def _decode_bytewise(s):
2977 # Decode one char at a time
2978 for c in s:
2979 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002981 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002982 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002983 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002984 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002985 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002987 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002988 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002989 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002990 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002991 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 input = "abc"
2993 if encoder is not None:
2994 encoder.reset()
2995 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002996 self.assertEqual(decoder.decode(input), "abc")
2997 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002998
2999 def test_newline_decoder(self):
3000 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001 # None meaning the IncrementalNewlineDecoder takes unicode input
3002 # rather than bytes input
3003 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003004 'utf-16', 'utf-16-le', 'utf-16-be',
3005 'utf-32', 'utf-32-le', 'utf-32-be',
3006 )
3007 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003008 decoder = enc and codecs.getincrementaldecoder(enc)()
3009 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3010 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003011 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003012 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3013 self.check_newline_decoding_utf8(decoder)
3014
Antoine Pitrou66913e22009-03-06 23:40:56 +00003015 def test_newline_bytes(self):
3016 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3017 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003018 self.assertEqual(dec.newlines, None)
3019 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3020 self.assertEqual(dec.newlines, None)
3021 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3022 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003023 dec = self.IncrementalNewlineDecoder(None, translate=False)
3024 _check(dec)
3025 dec = self.IncrementalNewlineDecoder(None, translate=True)
3026 _check(dec)
3027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003028class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3029 pass
3030
3031class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3032 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003033
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003034
Guido van Rossum01a27522007-03-07 01:00:12 +00003035# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003036
Guido van Rossum5abbf752007-08-27 17:39:33 +00003037class MiscIOTest(unittest.TestCase):
3038
Barry Warsaw40e82462008-11-20 20:14:50 +00003039 def tearDown(self):
3040 support.unlink(support.TESTFN)
3041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042 def test___all__(self):
3043 for name in self.io.__all__:
3044 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003045 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003046 if name == "open":
3047 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003048 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003049 self.assertTrue(issubclass(obj, Exception), name)
3050 elif not name.startswith("SEEK_"):
3051 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003052
Barry Warsaw40e82462008-11-20 20:14:50 +00003053 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003054 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003055 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003056 f.close()
3057
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003058 with support.check_warnings(('', DeprecationWarning)):
3059 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003060 self.assertEqual(f.name, support.TESTFN)
3061 self.assertEqual(f.buffer.name, support.TESTFN)
3062 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3063 self.assertEqual(f.mode, "U")
3064 self.assertEqual(f.buffer.mode, "rb")
3065 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003066 f.close()
3067
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003069 self.assertEqual(f.mode, "w+")
3070 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3071 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003073 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003074 self.assertEqual(g.mode, "wb")
3075 self.assertEqual(g.raw.mode, "wb")
3076 self.assertEqual(g.name, f.fileno())
3077 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003078 f.close()
3079 g.close()
3080
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003081 def test_io_after_close(self):
3082 for kwargs in [
3083 {"mode": "w"},
3084 {"mode": "wb"},
3085 {"mode": "w", "buffering": 1},
3086 {"mode": "w", "buffering": 2},
3087 {"mode": "wb", "buffering": 0},
3088 {"mode": "r"},
3089 {"mode": "rb"},
3090 {"mode": "r", "buffering": 1},
3091 {"mode": "r", "buffering": 2},
3092 {"mode": "rb", "buffering": 0},
3093 {"mode": "w+"},
3094 {"mode": "w+b"},
3095 {"mode": "w+", "buffering": 1},
3096 {"mode": "w+", "buffering": 2},
3097 {"mode": "w+b", "buffering": 0},
3098 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003099 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003100 f.close()
3101 self.assertRaises(ValueError, f.flush)
3102 self.assertRaises(ValueError, f.fileno)
3103 self.assertRaises(ValueError, f.isatty)
3104 self.assertRaises(ValueError, f.__iter__)
3105 if hasattr(f, "peek"):
3106 self.assertRaises(ValueError, f.peek, 1)
3107 self.assertRaises(ValueError, f.read)
3108 if hasattr(f, "read1"):
3109 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003110 if hasattr(f, "readall"):
3111 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003112 if hasattr(f, "readinto"):
3113 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003114 if hasattr(f, "readinto1"):
3115 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003116 self.assertRaises(ValueError, f.readline)
3117 self.assertRaises(ValueError, f.readlines)
3118 self.assertRaises(ValueError, f.seek, 0)
3119 self.assertRaises(ValueError, f.tell)
3120 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003121 self.assertRaises(ValueError, f.write,
3122 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003123 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003124 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003126 def test_blockingioerror(self):
3127 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003128 class C(str):
3129 pass
3130 c = C("")
3131 b = self.BlockingIOError(1, c)
3132 c.b = b
3133 b.c = c
3134 wr = weakref.ref(c)
3135 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003136 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003137 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003138
3139 def test_abcs(self):
3140 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003141 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3142 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3143 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3144 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003145
3146 def _check_abc_inheritance(self, abcmodule):
3147 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003148 self.assertIsInstance(f, abcmodule.IOBase)
3149 self.assertIsInstance(f, abcmodule.RawIOBase)
3150 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3151 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003152 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003153 self.assertIsInstance(f, abcmodule.IOBase)
3154 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3155 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3156 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003157 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003158 self.assertIsInstance(f, abcmodule.IOBase)
3159 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3160 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3161 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003162
3163 def test_abc_inheritance(self):
3164 # Test implementations inherit from their respective ABCs
3165 self._check_abc_inheritance(self)
3166
3167 def test_abc_inheritance_official(self):
3168 # Test implementations inherit from the official ABCs of the
3169 # baseline "io" module.
3170 self._check_abc_inheritance(io)
3171
Antoine Pitroue033e062010-10-29 10:38:18 +00003172 def _check_warn_on_dealloc(self, *args, **kwargs):
3173 f = open(*args, **kwargs)
3174 r = repr(f)
3175 with self.assertWarns(ResourceWarning) as cm:
3176 f = None
3177 support.gc_collect()
3178 self.assertIn(r, str(cm.warning.args[0]))
3179
3180 def test_warn_on_dealloc(self):
3181 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3182 self._check_warn_on_dealloc(support.TESTFN, "wb")
3183 self._check_warn_on_dealloc(support.TESTFN, "w")
3184
3185 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3186 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003187 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003188 for fd in fds:
3189 try:
3190 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003191 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003192 if e.errno != errno.EBADF:
3193 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003194 self.addCleanup(cleanup_fds)
3195 r, w = os.pipe()
3196 fds += r, w
3197 self._check_warn_on_dealloc(r, *args, **kwargs)
3198 # When using closefd=False, there's no warning
3199 r, w = os.pipe()
3200 fds += r, w
3201 with warnings.catch_warnings(record=True) as recorded:
3202 open(r, *args, closefd=False, **kwargs)
3203 support.gc_collect()
3204 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003205
3206 def test_warn_on_dealloc_fd(self):
3207 self._check_warn_on_dealloc_fd("rb", buffering=0)
3208 self._check_warn_on_dealloc_fd("rb")
3209 self._check_warn_on_dealloc_fd("r")
3210
3211
Antoine Pitrou243757e2010-11-05 21:15:39 +00003212 def test_pickling(self):
3213 # Pickling file objects is forbidden
3214 for kwargs in [
3215 {"mode": "w"},
3216 {"mode": "wb"},
3217 {"mode": "wb", "buffering": 0},
3218 {"mode": "r"},
3219 {"mode": "rb"},
3220 {"mode": "rb", "buffering": 0},
3221 {"mode": "w+"},
3222 {"mode": "w+b"},
3223 {"mode": "w+b", "buffering": 0},
3224 ]:
3225 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3226 with self.open(support.TESTFN, **kwargs) as f:
3227 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3228
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003229 def test_nonblock_pipe_write_bigbuf(self):
3230 self._test_nonblock_pipe_write(16*1024)
3231
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003232 def test_nonblock_pipe_write_smallbuf(self):
3233 self._test_nonblock_pipe_write(1024)
3234
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003235 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3236 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003237 def _test_nonblock_pipe_write(self, bufsize):
3238 sent = []
3239 received = []
3240 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003241 os.set_blocking(r, False)
3242 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003243
3244 # To exercise all code paths in the C implementation we need
3245 # to play with buffer sizes. For instance, if we choose a
3246 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3247 # then we will never get a partial write of the buffer.
3248 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3249 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3250
3251 with rf, wf:
3252 for N in 9999, 73, 7574:
3253 try:
3254 i = 0
3255 while True:
3256 msg = bytes([i % 26 + 97]) * N
3257 sent.append(msg)
3258 wf.write(msg)
3259 i += 1
3260
3261 except self.BlockingIOError as e:
3262 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003263 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003264 sent[-1] = sent[-1][:e.characters_written]
3265 received.append(rf.read())
3266 msg = b'BLOCKED'
3267 wf.write(msg)
3268 sent.append(msg)
3269
3270 while True:
3271 try:
3272 wf.flush()
3273 break
3274 except self.BlockingIOError as e:
3275 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003276 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003277 self.assertEqual(e.characters_written, 0)
3278 received.append(rf.read())
3279
3280 received += iter(rf.read, None)
3281
3282 sent, received = b''.join(sent), b''.join(received)
3283 self.assertTrue(sent == received)
3284 self.assertTrue(wf.closed)
3285 self.assertTrue(rf.closed)
3286
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003287 def test_create_fail(self):
3288 # 'x' mode fails if file is existing
3289 with self.open(support.TESTFN, 'w'):
3290 pass
3291 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3292
3293 def test_create_writes(self):
3294 # 'x' mode opens for writing
3295 with self.open(support.TESTFN, 'xb') as f:
3296 f.write(b"spam")
3297 with self.open(support.TESTFN, 'rb') as f:
3298 self.assertEqual(b"spam", f.read())
3299
Christian Heimes7b648752012-09-10 14:48:43 +02003300 def test_open_allargs(self):
3301 # there used to be a buffer overflow in the parser for rawmode
3302 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3303
3304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003305class CMiscIOTest(MiscIOTest):
3306 io = io
3307
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003308 def test_readinto_buffer_overflow(self):
3309 # Issue #18025
3310 class BadReader(self.io.BufferedIOBase):
3311 def read(self, n=-1):
3312 return b'x' * 10**6
3313 bufio = BadReader()
3314 b = bytearray(2)
3315 self.assertRaises(ValueError, bufio.readinto, b)
3316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003317class PyMiscIOTest(MiscIOTest):
3318 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003319
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003320
3321@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3322class SignalsTest(unittest.TestCase):
3323
3324 def setUp(self):
3325 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3326
3327 def tearDown(self):
3328 signal.signal(signal.SIGALRM, self.oldalrm)
3329
3330 def alarm_interrupt(self, sig, frame):
3331 1/0
3332
3333 @unittest.skipUnless(threading, 'Threading required for this test.')
3334 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3335 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003336 invokes the signal handler, and bubbles up the exception raised
3337 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003338 read_results = []
3339 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003340 if hasattr(signal, 'pthread_sigmask'):
3341 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003342 s = os.read(r, 1)
3343 read_results.append(s)
3344 t = threading.Thread(target=_read)
3345 t.daemon = True
3346 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003347 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003348 try:
3349 wio = self.io.open(w, **fdopen_kwargs)
3350 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003351 # Fill the pipe enough that the write will be blocking.
3352 # It will be interrupted by the timer armed above. Since the
3353 # other thread has read one byte, the low-level write will
3354 # return with a successful (partial) result rather than an EINTR.
3355 # The buffered IO layer must check for pending signal
3356 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003357 signal.alarm(1)
3358 try:
3359 self.assertRaises(ZeroDivisionError,
3360 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3361 finally:
3362 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003363 t.join()
3364 # We got one byte, get another one and check that it isn't a
3365 # repeat of the first one.
3366 read_results.append(os.read(r, 1))
3367 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3368 finally:
3369 os.close(w)
3370 os.close(r)
3371 # This is deliberate. If we didn't close the file descriptor
3372 # before closing wio, wio would try to flush its internal
3373 # buffer, and block again.
3374 try:
3375 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003376 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003377 if e.errno != errno.EBADF:
3378 raise
3379
3380 def test_interrupted_write_unbuffered(self):
3381 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3382
3383 def test_interrupted_write_buffered(self):
3384 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3385
3386 def test_interrupted_write_text(self):
3387 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3388
Brett Cannon31f59292011-02-21 19:29:56 +00003389 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003390 def check_reentrant_write(self, data, **fdopen_kwargs):
3391 def on_alarm(*args):
3392 # Will be called reentrantly from the same thread
3393 wio.write(data)
3394 1/0
3395 signal.signal(signal.SIGALRM, on_alarm)
3396 r, w = os.pipe()
3397 wio = self.io.open(w, **fdopen_kwargs)
3398 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003399 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003400 # Either the reentrant call to wio.write() fails with RuntimeError,
3401 # or the signal handler raises ZeroDivisionError.
3402 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3403 while 1:
3404 for i in range(100):
3405 wio.write(data)
3406 wio.flush()
3407 # Make sure the buffer doesn't fill up and block further writes
3408 os.read(r, len(data) * 100)
3409 exc = cm.exception
3410 if isinstance(exc, RuntimeError):
3411 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3412 finally:
3413 wio.close()
3414 os.close(r)
3415
3416 def test_reentrant_write_buffered(self):
3417 self.check_reentrant_write(b"xy", mode="wb")
3418
3419 def test_reentrant_write_text(self):
3420 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3421
Antoine Pitrou707ce822011-02-25 21:24:11 +00003422 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3423 """Check that a buffered read, when it gets interrupted (either
3424 returning a partial result or EINTR), properly invokes the signal
3425 handler and retries if the latter returned successfully."""
3426 r, w = os.pipe()
3427 fdopen_kwargs["closefd"] = False
3428 def alarm_handler(sig, frame):
3429 os.write(w, b"bar")
3430 signal.signal(signal.SIGALRM, alarm_handler)
3431 try:
3432 rio = self.io.open(r, **fdopen_kwargs)
3433 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003434 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003435 # Expected behaviour:
3436 # - first raw read() returns partial b"foo"
3437 # - second raw read() returns EINTR
3438 # - third raw read() returns b"bar"
3439 self.assertEqual(decode(rio.read(6)), "foobar")
3440 finally:
3441 rio.close()
3442 os.close(w)
3443 os.close(r)
3444
Antoine Pitrou20db5112011-08-19 20:32:34 +02003445 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003446 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3447 mode="rb")
3448
Antoine Pitrou20db5112011-08-19 20:32:34 +02003449 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003450 self.check_interrupted_read_retry(lambda x: x,
3451 mode="r")
3452
3453 @unittest.skipUnless(threading, 'Threading required for this test.')
3454 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3455 """Check that a buffered write, when it gets interrupted (either
3456 returning a partial result or EINTR), properly invokes the signal
3457 handler and retries if the latter returned successfully."""
3458 select = support.import_module("select")
3459 # A quantity that exceeds the buffer size of an anonymous pipe's
3460 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003461 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003462 r, w = os.pipe()
3463 fdopen_kwargs["closefd"] = False
3464 # We need a separate thread to read from the pipe and allow the
3465 # write() to finish. This thread is started after the SIGALRM is
3466 # received (forcing a first EINTR in write()).
3467 read_results = []
3468 write_finished = False
3469 def _read():
3470 while not write_finished:
3471 while r in select.select([r], [], [], 1.0)[0]:
3472 s = os.read(r, 1024)
3473 read_results.append(s)
3474 t = threading.Thread(target=_read)
3475 t.daemon = True
3476 def alarm1(sig, frame):
3477 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003478 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003479 def alarm2(sig, frame):
3480 t.start()
3481 signal.signal(signal.SIGALRM, alarm1)
3482 try:
3483 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003484 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003485 # Expected behaviour:
3486 # - first raw write() is partial (because of the limited pipe buffer
3487 # and the first alarm)
3488 # - second raw write() returns EINTR (because of the second alarm)
3489 # - subsequent write()s are successful (either partial or complete)
3490 self.assertEqual(N, wio.write(item * N))
3491 wio.flush()
3492 write_finished = True
3493 t.join()
3494 self.assertEqual(N, sum(len(x) for x in read_results))
3495 finally:
3496 write_finished = True
3497 os.close(w)
3498 os.close(r)
3499 # This is deliberate. If we didn't close the file descriptor
3500 # before closing wio, wio would try to flush its internal
3501 # buffer, and could block (in case of failure).
3502 try:
3503 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003504 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003505 if e.errno != errno.EBADF:
3506 raise
3507
Antoine Pitrou20db5112011-08-19 20:32:34 +02003508 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003509 self.check_interrupted_write_retry(b"x", mode="wb")
3510
Antoine Pitrou20db5112011-08-19 20:32:34 +02003511 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003512 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3513
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003514
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003515class CSignalsTest(SignalsTest):
3516 io = io
3517
3518class PySignalsTest(SignalsTest):
3519 io = pyio
3520
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003521 # Handling reentrancy issues would slow down _pyio even more, so the
3522 # tests are disabled.
3523 test_reentrant_write_buffered = None
3524 test_reentrant_write_text = None
3525
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003526
Ezio Melottidaa42c72013-03-23 16:30:16 +02003527def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003528 tests = (CIOTest, PyIOTest,
3529 CBufferedReaderTest, PyBufferedReaderTest,
3530 CBufferedWriterTest, PyBufferedWriterTest,
3531 CBufferedRWPairTest, PyBufferedRWPairTest,
3532 CBufferedRandomTest, PyBufferedRandomTest,
3533 StatefulIncrementalDecoderTest,
3534 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3535 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003536 CMiscIOTest, PyMiscIOTest,
3537 CSignalsTest, PySignalsTest,
3538 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003539
3540 # Put the namespaces of the IO module we are testing and some useful mock
3541 # classes in the __dict__ of each test.
3542 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003543 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003544 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3545 c_io_ns = {name : getattr(io, name) for name in all_members}
3546 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3547 globs = globals()
3548 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3549 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3550 # Avoid turning open into a bound method.
3551 py_io_ns["open"] = pyio.OpenWrapper
3552 for test in tests:
3553 if test.__name__.startswith("C"):
3554 for name, obj in c_io_ns.items():
3555 setattr(test, name, obj)
3556 elif test.__name__.startswith("Py"):
3557 for name, obj in py_io_ns.items():
3558 setattr(test, name, obj)
3559
Ezio Melottidaa42c72013-03-23 16:30:16 +02003560 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3561 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003562
3563if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003564 unittest.main()