blob: 663db961df5fa9b6f1e35eb151110af18d836c4d [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Antoine Pitrou6be88762010-05-03 16:48:20 +0000592 def test_flush_error_on_close(self):
593 f = self.open(support.TESTFN, "wb", buffering=0)
594 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200595 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200597 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600598 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599
600 def test_multi_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 f.close()
603 f.close()
604 f.close()
605 self.assertRaises(ValueError, f.flush)
606
Antoine Pitrou328ec742010-09-14 18:37:24 +0000607 def test_RawIOBase_read(self):
608 # Exercise the default RawIOBase.read() implementation (which calls
609 # readinto() internally).
610 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
611 self.assertEqual(rawio.read(2), b"ab")
612 self.assertEqual(rawio.read(2), b"c")
613 self.assertEqual(rawio.read(2), b"d")
614 self.assertEqual(rawio.read(2), None)
615 self.assertEqual(rawio.read(2), b"ef")
616 self.assertEqual(rawio.read(2), b"g")
617 self.assertEqual(rawio.read(2), None)
618 self.assertEqual(rawio.read(2), b"")
619
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400620 def test_types_have_dict(self):
621 test = (
622 self.IOBase(),
623 self.RawIOBase(),
624 self.TextIOBase(),
625 self.StringIO(),
626 self.BytesIO()
627 )
628 for obj in test:
629 self.assertTrue(hasattr(obj, "__dict__"))
630
Ross Lagerwall59142db2011-10-31 20:34:46 +0200631 def test_opener(self):
632 with self.open(support.TESTFN, "w") as f:
633 f.write("egg\n")
634 fd = os.open(support.TESTFN, os.O_RDONLY)
635 def opener(path, flags):
636 return fd
637 with self.open("non-existent", "r", opener=opener) as f:
638 self.assertEqual(f.read(), "egg\n")
639
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200640 def test_fileio_closefd(self):
641 # Issue #4841
642 with self.open(__file__, 'rb') as f1, \
643 self.open(__file__, 'rb') as f2:
644 fileio = self.FileIO(f1.fileno(), closefd=False)
645 # .__init__() must not close f1
646 fileio.__init__(f2.fileno(), closefd=False)
647 f1.readline()
648 # .close() must not close f2
649 fileio.close()
650 f2.readline()
651
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300652 def test_nonbuffered_textio(self):
653 with warnings.catch_warnings(record=True) as recorded:
654 with self.assertRaises(ValueError):
655 self.open(support.TESTFN, 'w', buffering=0)
656 support.gc_collect()
657 self.assertEqual(recorded, [])
658
659 def test_invalid_newline(self):
660 with warnings.catch_warnings(record=True) as recorded:
661 with self.assertRaises(ValueError):
662 self.open(support.TESTFN, 'w', newline='invalid')
663 support.gc_collect()
664 self.assertEqual(recorded, [])
665
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200668
669 def test_IOBase_finalize(self):
670 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
671 # class which inherits IOBase and an object of this class are caught
672 # in a reference cycle and close() is already in the method cache.
673 class MyIO(self.IOBase):
674 def close(self):
675 pass
676
677 # create an instance to populate the method cache
678 MyIO()
679 obj = MyIO()
680 obj.obj = obj
681 wr = weakref.ref(obj)
682 del MyIO
683 del obj
684 support.gc_collect()
685 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687class PyIOTest(IOTest):
688 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000689
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class CommonBufferedTests:
692 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
693
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000694 def test_detach(self):
695 raw = self.MockRawIO()
696 buf = self.tp(raw)
697 self.assertIs(buf.detach(), raw)
698 self.assertRaises(ValueError, buf.detach)
699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_fileno(self):
701 rawio = self.MockRawIO()
702 bufio = self.tp(rawio)
703
Ezio Melottib3aedd42010-11-20 19:04:17 +0000704 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000705
Zachary Ware9fe6d862013-12-08 00:20:35 -0600706 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_no_fileno(self):
708 # XXX will we always have fileno() function? If so, kill
709 # this test. Else, write it.
710 pass
711
712 def test_invalid_args(self):
713 rawio = self.MockRawIO()
714 bufio = self.tp(rawio)
715 # Invalid whence
716 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200717 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718
719 def test_override_destructor(self):
720 tp = self.tp
721 record = []
722 class MyBufferedIO(tp):
723 def __del__(self):
724 record.append(1)
725 try:
726 f = super().__del__
727 except AttributeError:
728 pass
729 else:
730 f()
731 def close(self):
732 record.append(2)
733 super().close()
734 def flush(self):
735 record.append(3)
736 super().flush()
737 rawio = self.MockRawIO()
738 bufio = MyBufferedIO(rawio)
739 writable = bufio.writable()
740 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000741 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 if writable:
743 self.assertEqual(record, [1, 2, 3])
744 else:
745 self.assertEqual(record, [1, 2])
746
747 def test_context_manager(self):
748 # Test usability as a context manager
749 rawio = self.MockRawIO()
750 bufio = self.tp(rawio)
751 def _with():
752 with bufio:
753 pass
754 _with()
755 # bufio should now be closed, and using it a second time should raise
756 # a ValueError.
757 self.assertRaises(ValueError, _with)
758
759 def test_error_through_destructor(self):
760 # Test that the exception state is not modified by a destructor,
761 # even if close() fails.
762 rawio = self.CloseFailureIO()
763 def f():
764 self.tp(rawio).xyzzy
765 with support.captured_output("stderr") as s:
766 self.assertRaises(AttributeError, f)
767 s = s.getvalue().strip()
768 if s:
769 # The destructor *may* have printed an unraisable error, check it
770 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200771 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000772 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000773
Antoine Pitrou716c4442009-05-23 19:04:03 +0000774 def test_repr(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300777 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000778 self.assertEqual(repr(b), "<%s>" % clsname)
779 raw.name = "dummy"
780 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
781 raw.name = b"dummy"
782 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
783
Antoine Pitrou6be88762010-05-03 16:48:20 +0000784 def test_flush_error_on_close(self):
785 raw = self.MockRawIO()
786 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200787 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 raw.flush = bad_flush
789 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 self.assertTrue(b.closed)
792
793 def test_close_error_on_close(self):
794 raw = self.MockRawIO()
795 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200798 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600799 raw.close = bad_close
800 b = self.tp(raw)
801 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 b.close()
804 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300805 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600806 self.assertEqual(err.exception.__context__.args, ('flush',))
807 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000808
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300809 def test_nonnormalized_close_error_on_close(self):
810 # Issue #21677
811 raw = self.MockRawIO()
812 def bad_flush():
813 raise non_existing_flush
814 def bad_close():
815 raise non_existing_close
816 raw.close = bad_close
817 b = self.tp(raw)
818 b.flush = bad_flush
819 with self.assertRaises(NameError) as err: # exception not swallowed
820 b.close()
821 self.assertIn('non_existing_close', str(err.exception))
822 self.assertIsInstance(err.exception.__context__, NameError)
823 self.assertIn('non_existing_flush', str(err.exception.__context__))
824 self.assertFalse(b.closed)
825
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826 def test_multi_close(self):
827 raw = self.MockRawIO()
828 b = self.tp(raw)
829 b.close()
830 b.close()
831 b.close()
832 self.assertRaises(ValueError, b.flush)
833
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000834 def test_unseekable(self):
835 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
836 self.assertRaises(self.UnsupportedOperation, bufio.tell)
837 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
838
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000839 def test_readonly_attributes(self):
840 raw = self.MockRawIO()
841 buf = self.tp(raw)
842 x = self.MockRawIO()
843 with self.assertRaises(AttributeError):
844 buf.raw = x
845
Guido van Rossum78892e42007-04-06 17:31:18 +0000846
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200847class SizeofTest:
848
849 @support.cpython_only
850 def test_sizeof(self):
851 bufsize1 = 4096
852 bufsize2 = 8192
853 rawio = self.MockRawIO()
854 bufio = self.tp(rawio, buffer_size=bufsize1)
855 size = sys.getsizeof(bufio) - bufsize1
856 rawio = self.MockRawIO()
857 bufio = self.tp(rawio, buffer_size=bufsize2)
858 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
859
Jesus Ceadc469452012-10-04 12:37:56 +0200860 @support.cpython_only
861 def test_buffer_freeing(self) :
862 bufsize = 4096
863 rawio = self.MockRawIO()
864 bufio = self.tp(rawio, buffer_size=bufsize)
865 size = sys.getsizeof(bufio) - bufsize
866 bufio.close()
867 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
870 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_constructor(self):
873 rawio = self.MockRawIO([b"abc"])
874 bufio = self.tp(rawio)
875 bufio.__init__(rawio)
876 bufio.__init__(rawio, buffer_size=1024)
877 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000878 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
880 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
882 rawio = self.MockRawIO([b"abc"])
883 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000885
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200886 def test_uninitialized(self):
887 bufio = self.tp.__new__(self.tp)
888 del bufio
889 bufio = self.tp.__new__(self.tp)
890 self.assertRaisesRegex((ValueError, AttributeError),
891 'uninitialized|has no attribute',
892 bufio.read, 0)
893 bufio.__init__(self.MockRawIO())
894 self.assertEqual(bufio.read(0), b'')
895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000897 for arg in (None, 7):
898 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
899 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000900 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 # Invalid args
902 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_read1(self):
905 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
906 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000907 self.assertEqual(b"a", bufio.read(1))
908 self.assertEqual(b"b", bufio.read1(1))
909 self.assertEqual(rawio._reads, 1)
910 self.assertEqual(b"c", bufio.read1(100))
911 self.assertEqual(rawio._reads, 1)
912 self.assertEqual(b"d", bufio.read1(100))
913 self.assertEqual(rawio._reads, 2)
914 self.assertEqual(b"efg", bufio.read1(100))
915 self.assertEqual(rawio._reads, 3)
916 self.assertEqual(b"", bufio.read1(100))
917 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 # Invalid args
919 self.assertRaises(ValueError, bufio.read1, -1)
920
921 def test_readinto(self):
922 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
923 bufio = self.tp(rawio)
924 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000925 self.assertEqual(bufio.readinto(b), 2)
926 self.assertEqual(b, b"ab")
927 self.assertEqual(bufio.readinto(b), 2)
928 self.assertEqual(b, b"cd")
929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"ef")
931 self.assertEqual(bufio.readinto(b), 1)
932 self.assertEqual(b, b"gf")
933 self.assertEqual(bufio.readinto(b), 0)
934 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200935 rawio = self.MockRawIO((b"abc", None))
936 bufio = self.tp(rawio)
937 self.assertEqual(bufio.readinto(b), 2)
938 self.assertEqual(b, b"ab")
939 self.assertEqual(bufio.readinto(b), 1)
940 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941
Benjamin Petersona96fea02014-06-22 14:17:44 -0700942 def test_readinto1(self):
943 buffer_size = 10
944 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
945 bufio = self.tp(rawio, buffer_size=buffer_size)
946 b = bytearray(2)
947 self.assertEqual(bufio.peek(3), b'abc')
948 self.assertEqual(rawio._reads, 1)
949 self.assertEqual(bufio.readinto1(b), 2)
950 self.assertEqual(b, b"ab")
951 self.assertEqual(rawio._reads, 1)
952 self.assertEqual(bufio.readinto1(b), 1)
953 self.assertEqual(b[:1], b"c")
954 self.assertEqual(rawio._reads, 1)
955 self.assertEqual(bufio.readinto1(b), 2)
956 self.assertEqual(b, b"de")
957 self.assertEqual(rawio._reads, 2)
958 b = bytearray(2*buffer_size)
959 self.assertEqual(bufio.peek(3), b'fgh')
960 self.assertEqual(rawio._reads, 3)
961 self.assertEqual(bufio.readinto1(b), 6)
962 self.assertEqual(b[:6], b"fghjkl")
963 self.assertEqual(rawio._reads, 4)
964
965 def test_readinto_array(self):
966 buffer_size = 60
967 data = b"a" * 26
968 rawio = self.MockRawIO((data,))
969 bufio = self.tp(rawio, buffer_size=buffer_size)
970
971 # Create an array with element size > 1 byte
972 b = array.array('i', b'x' * 32)
973 assert len(b) != 16
974
975 # Read into it. We should get as many *bytes* as we can fit into b
976 # (which is more than the number of elements)
977 n = bufio.readinto(b)
978 self.assertGreater(n, len(b))
979
980 # Check that old contents of b are preserved
981 bm = memoryview(b).cast('B')
982 self.assertLess(n, len(bm))
983 self.assertEqual(bm[:n], data[:n])
984 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
985
986 def test_readinto1_array(self):
987 buffer_size = 60
988 data = b"a" * 26
989 rawio = self.MockRawIO((data,))
990 bufio = self.tp(rawio, buffer_size=buffer_size)
991
992 # Create an array with element size > 1 byte
993 b = array.array('i', b'x' * 32)
994 assert len(b) != 16
995
996 # Read into it. We should get as many *bytes* as we can fit into b
997 # (which is more than the number of elements)
998 n = bufio.readinto1(b)
999 self.assertGreater(n, len(b))
1000
1001 # Check that old contents of b are preserved
1002 bm = memoryview(b).cast('B')
1003 self.assertLess(n, len(bm))
1004 self.assertEqual(bm[:n], data[:n])
1005 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1006
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001007 def test_readlines(self):
1008 def bufio():
1009 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1010 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1012 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1013 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001016 data = b"abcdefghi"
1017 dlen = len(data)
1018
1019 tests = [
1020 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1021 [ 100, [ 3, 3, 3], [ dlen ] ],
1022 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1023 ]
1024
1025 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 rawio = self.MockFileIO(data)
1027 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001028 pos = 0
1029 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001030 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001031 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001033 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001036 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1038 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001039 self.assertEqual(b"abcd", bufio.read(6))
1040 self.assertEqual(b"e", bufio.read(1))
1041 self.assertEqual(b"fg", bufio.read())
1042 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001043 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001044 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001045
Victor Stinnera80987f2011-05-25 22:47:16 +02001046 rawio = self.MockRawIO((b"a", None, None))
1047 self.assertEqual(b"a", rawio.readall())
1048 self.assertIsNone(rawio.readall())
1049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 def test_read_past_eof(self):
1051 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1052 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001053
Ezio Melottib3aedd42010-11-20 19:04:17 +00001054 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_read_all(self):
1057 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1058 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059
Ezio Melottib3aedd42010-11-20 19:04:17 +00001060 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061
Victor Stinner45df8202010-04-28 22:31:17 +00001062 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001063 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001065 try:
1066 # Write out many bytes with exactly the same number of 0's,
1067 # 1's... 255's. This will help us check that concurrent reading
1068 # doesn't duplicate or forget contents.
1069 N = 1000
1070 l = list(range(256)) * N
1071 random.shuffle(l)
1072 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001073 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001074 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001075 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001077 errors = []
1078 results = []
1079 def f():
1080 try:
1081 # Intra-buffer read then buffer-flushing read
1082 for n in cycle([1, 19]):
1083 s = bufio.read(n)
1084 if not s:
1085 break
1086 # list.append() is atomic
1087 results.append(s)
1088 except Exception as e:
1089 errors.append(e)
1090 raise
1091 threads = [threading.Thread(target=f) for x in range(20)]
1092 for t in threads:
1093 t.start()
1094 time.sleep(0.02) # yield
1095 for t in threads:
1096 t.join()
1097 self.assertFalse(errors,
1098 "the following exceptions were caught: %r" % errors)
1099 s = b''.join(results)
1100 for i in range(256):
1101 c = bytes(bytearray([i]))
1102 self.assertEqual(s.count(c), N)
1103 finally:
1104 support.unlink(support.TESTFN)
1105
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001106 def test_unseekable(self):
1107 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1108 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1109 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1110 bufio.read(1)
1111 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1112 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_misbehaved_io(self):
1115 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1116 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001117 self.assertRaises(OSError, bufio.seek, 0)
1118 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001120 def test_no_extraneous_read(self):
1121 # Issue #9550; when the raw IO object has satisfied the read request,
1122 # we should not issue any additional reads, otherwise it may block
1123 # (e.g. socket).
1124 bufsize = 16
1125 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1126 rawio = self.MockRawIO([b"x" * n])
1127 bufio = self.tp(rawio, bufsize)
1128 self.assertEqual(bufio.read(n), b"x" * n)
1129 # Simple case: one raw read is enough to satisfy the request.
1130 self.assertEqual(rawio._extraneous_reads, 0,
1131 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1132 # A more complex case where two raw reads are needed to satisfy
1133 # the request.
1134 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1135 bufio = self.tp(rawio, bufsize)
1136 self.assertEqual(bufio.read(n), b"x" * n)
1137 self.assertEqual(rawio._extraneous_reads, 0,
1138 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1139
1140
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001141class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 tp = io.BufferedReader
1143
1144 def test_constructor(self):
1145 BufferedReaderTest.test_constructor(self)
1146 # The allocation can succeed on 32-bit builds, e.g. with more
1147 # than 2GB RAM and a 64-bit kernel.
1148 if sys.maxsize > 0x7FFFFFFF:
1149 rawio = self.MockRawIO()
1150 bufio = self.tp(rawio)
1151 self.assertRaises((OverflowError, MemoryError, ValueError),
1152 bufio.__init__, rawio, sys.maxsize)
1153
1154 def test_initialization(self):
1155 rawio = self.MockRawIO([b"abc"])
1156 bufio = self.tp(rawio)
1157 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1158 self.assertRaises(ValueError, bufio.read)
1159 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1160 self.assertRaises(ValueError, bufio.read)
1161 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1162 self.assertRaises(ValueError, bufio.read)
1163
1164 def test_misbehaved_io_read(self):
1165 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1166 bufio = self.tp(rawio)
1167 # _pyio.BufferedReader seems to implement reading different, so that
1168 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170
1171 def test_garbage_collection(self):
1172 # C BufferedReader objects are collected.
1173 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001174 with support.check_warnings(('', ResourceWarning)):
1175 rawio = self.FileIO(support.TESTFN, "w+b")
1176 f = self.tp(rawio)
1177 f.f = f
1178 wr = weakref.ref(f)
1179 del f
1180 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001181 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182
R David Murray67bfe802013-02-23 21:51:05 -05001183 def test_args_error(self):
1184 # Issue #17275
1185 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1186 self.tp(io.BytesIO(), 1024, 1024, 1024)
1187
1188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189class PyBufferedReaderTest(BufferedReaderTest):
1190 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001191
Guido van Rossuma9e20242007-03-08 00:43:48 +00001192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1194 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 def test_constructor(self):
1197 rawio = self.MockRawIO()
1198 bufio = self.tp(rawio)
1199 bufio.__init__(rawio)
1200 bufio.__init__(rawio, buffer_size=1024)
1201 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001202 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 bufio.flush()
1204 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1205 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1206 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1207 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001208 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001212 def test_uninitialized(self):
1213 bufio = self.tp.__new__(self.tp)
1214 del bufio
1215 bufio = self.tp.__new__(self.tp)
1216 self.assertRaisesRegex((ValueError, AttributeError),
1217 'uninitialized|has no attribute',
1218 bufio.write, b'')
1219 bufio.__init__(self.MockRawIO())
1220 self.assertEqual(bufio.write(b''), 0)
1221
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001222 def test_detach_flush(self):
1223 raw = self.MockRawIO()
1224 buf = self.tp(raw)
1225 buf.write(b"howdy!")
1226 self.assertFalse(raw._write_stack)
1227 buf.detach()
1228 self.assertEqual(raw._write_stack, [b"howdy!"])
1229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001231 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 writer = self.MockRawIO()
1233 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001234 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001235 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_write_overflow(self):
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 contents = b"abcdefghijklmnop"
1241 for n in range(0, len(contents), 3):
1242 bufio.write(contents[n:n+3])
1243 flushed = b"".join(writer._write_stack)
1244 # At least (total - 8) bytes were implicitly flushed, perhaps more
1245 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001246 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 def check_writes(self, intermediate_func):
1249 # Lots of writes, test the flushed output is as expected.
1250 contents = bytes(range(256)) * 1000
1251 n = 0
1252 writer = self.MockRawIO()
1253 bufio = self.tp(writer, 13)
1254 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1255 def gen_sizes():
1256 for size in count(1):
1257 for i in range(15):
1258 yield size
1259 sizes = gen_sizes()
1260 while n < len(contents):
1261 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 intermediate_func(bufio)
1264 n += size
1265 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_writes(self):
1269 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_writes_and_flushes(self):
1272 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_writes_and_seeks(self):
1275 def _seekabs(bufio):
1276 pos = bufio.tell()
1277 bufio.seek(pos + 1, 0)
1278 bufio.seek(pos - 1, 0)
1279 bufio.seek(pos, 0)
1280 self.check_writes(_seekabs)
1281 def _seekrel(bufio):
1282 pos = bufio.seek(0, 1)
1283 bufio.seek(+1, 1)
1284 bufio.seek(-1, 1)
1285 bufio.seek(pos, 0)
1286 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_writes_and_truncates(self):
1289 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001291 def test_write_non_blocking(self):
1292 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001293 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001294
Ezio Melottib3aedd42010-11-20 19:04:17 +00001295 self.assertEqual(bufio.write(b"abcd"), 4)
1296 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 # 1 byte will be written, the rest will be buffered
1298 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001301 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1302 raw.block_on(b"0")
1303 try:
1304 bufio.write(b"opqrwxyz0123456789")
1305 except self.BlockingIOError as e:
1306 written = e.characters_written
1307 else:
1308 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(written, 16)
1310 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001312
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314 s = raw.pop_written()
1315 # Previously buffered bytes were flushed
1316 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_write_and_rewind(self):
1319 raw = io.BytesIO()
1320 bufio = self.tp(raw, 4)
1321 self.assertEqual(bufio.write(b"abcdef"), 6)
1322 self.assertEqual(bufio.tell(), 6)
1323 bufio.seek(0, 0)
1324 self.assertEqual(bufio.write(b"XY"), 2)
1325 bufio.seek(6, 0)
1326 self.assertEqual(raw.getvalue(), b"XYcdef")
1327 self.assertEqual(bufio.write(b"123456"), 6)
1328 bufio.flush()
1329 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 def test_flush(self):
1332 writer = self.MockRawIO()
1333 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001334 bufio.write(b"abc")
1335 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001336 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001337
Antoine Pitrou131a4892012-10-16 22:57:11 +02001338 def test_writelines(self):
1339 l = [b'ab', b'cd', b'ef']
1340 writer = self.MockRawIO()
1341 bufio = self.tp(writer, 8)
1342 bufio.writelines(l)
1343 bufio.flush()
1344 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1345
1346 def test_writelines_userlist(self):
1347 l = UserList([b'ab', b'cd', b'ef'])
1348 writer = self.MockRawIO()
1349 bufio = self.tp(writer, 8)
1350 bufio.writelines(l)
1351 bufio.flush()
1352 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1353
1354 def test_writelines_error(self):
1355 writer = self.MockRawIO()
1356 bufio = self.tp(writer, 8)
1357 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1358 self.assertRaises(TypeError, bufio.writelines, None)
1359 self.assertRaises(TypeError, bufio.writelines, 'abc')
1360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 def test_destructor(self):
1362 writer = self.MockRawIO()
1363 bufio = self.tp(writer, 8)
1364 bufio.write(b"abc")
1365 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001366 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
1369 def test_truncate(self):
1370 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001371 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 bufio = self.tp(raw, 8)
1373 bufio.write(b"abcdef")
1374 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001375 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001376 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377 self.assertEqual(f.read(), b"abc")
1378
Victor Stinner45df8202010-04-28 22:31:17 +00001379 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001380 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001381 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001382 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 # Write out many bytes from many threads and test they were
1384 # all flushed.
1385 N = 1000
1386 contents = bytes(range(256)) * N
1387 sizes = cycle([1, 19])
1388 n = 0
1389 queue = deque()
1390 while n < len(contents):
1391 size = next(sizes)
1392 queue.append(contents[n:n+size])
1393 n += size
1394 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001395 # We use a real file object because it allows us to
1396 # exercise situations where the GIL is released before
1397 # writing the buffer to the raw streams. This is in addition
1398 # to concurrency issues due to switching threads in the middle
1399 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001400 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 errors = []
1403 def f():
1404 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 while True:
1406 try:
1407 s = queue.popleft()
1408 except IndexError:
1409 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001410 bufio.write(s)
1411 except Exception as e:
1412 errors.append(e)
1413 raise
1414 threads = [threading.Thread(target=f) for x in range(20)]
1415 for t in threads:
1416 t.start()
1417 time.sleep(0.02) # yield
1418 for t in threads:
1419 t.join()
1420 self.assertFalse(errors,
1421 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001423 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001424 s = f.read()
1425 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001427 finally:
1428 support.unlink(support.TESTFN)
1429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 def test_misbehaved_io(self):
1431 rawio = self.MisbehavedRawIO()
1432 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001433 self.assertRaises(OSError, bufio.seek, 0)
1434 self.assertRaises(OSError, bufio.tell)
1435 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436
Florent Xicluna109d5732012-07-07 17:03:22 +02001437 def test_max_buffer_size_removal(self):
1438 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001439 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001440
Benjamin Peterson68623612012-12-20 11:53:11 -06001441 def test_write_error_on_close(self):
1442 raw = self.MockRawIO()
1443 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001444 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001445 raw.write = bad_write
1446 b = self.tp(raw)
1447 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001448 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001449 self.assertTrue(b.closed)
1450
Benjamin Peterson59406a92009-03-26 17:10:29 +00001451
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001452class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453 tp = io.BufferedWriter
1454
1455 def test_constructor(self):
1456 BufferedWriterTest.test_constructor(self)
1457 # The allocation can succeed on 32-bit builds, e.g. with more
1458 # than 2GB RAM and a 64-bit kernel.
1459 if sys.maxsize > 0x7FFFFFFF:
1460 rawio = self.MockRawIO()
1461 bufio = self.tp(rawio)
1462 self.assertRaises((OverflowError, MemoryError, ValueError),
1463 bufio.__init__, rawio, sys.maxsize)
1464
1465 def test_initialization(self):
1466 rawio = self.MockRawIO()
1467 bufio = self.tp(rawio)
1468 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1469 self.assertRaises(ValueError, bufio.write, b"def")
1470 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1471 self.assertRaises(ValueError, bufio.write, b"def")
1472 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1473 self.assertRaises(ValueError, bufio.write, b"def")
1474
1475 def test_garbage_collection(self):
1476 # C BufferedWriter objects are collected, and collecting them flushes
1477 # all data to disk.
1478 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001479 with support.check_warnings(('', ResourceWarning)):
1480 rawio = self.FileIO(support.TESTFN, "w+b")
1481 f = self.tp(rawio)
1482 f.write(b"123xxx")
1483 f.x = f
1484 wr = weakref.ref(f)
1485 del f
1486 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001487 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001488 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 self.assertEqual(f.read(), b"123xxx")
1490
R David Murray67bfe802013-02-23 21:51:05 -05001491 def test_args_error(self):
1492 # Issue #17275
1493 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1494 self.tp(io.BytesIO(), 1024, 1024, 1024)
1495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001496
1497class PyBufferedWriterTest(BufferedWriterTest):
1498 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001499
Guido van Rossum01a27522007-03-07 01:00:12 +00001500class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001501
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001502 def test_constructor(self):
1503 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001504 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001506 def test_uninitialized(self):
1507 pair = self.tp.__new__(self.tp)
1508 del pair
1509 pair = self.tp.__new__(self.tp)
1510 self.assertRaisesRegex((ValueError, AttributeError),
1511 'uninitialized|has no attribute',
1512 pair.read, 0)
1513 self.assertRaisesRegex((ValueError, AttributeError),
1514 'uninitialized|has no attribute',
1515 pair.write, b'')
1516 pair.__init__(self.MockRawIO(), self.MockRawIO())
1517 self.assertEqual(pair.read(0), b'')
1518 self.assertEqual(pair.write(b''), 0)
1519
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001520 def test_detach(self):
1521 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1522 self.assertRaises(self.UnsupportedOperation, pair.detach)
1523
Florent Xicluna109d5732012-07-07 17:03:22 +02001524 def test_constructor_max_buffer_size_removal(self):
1525 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001526 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001527
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001528 def test_constructor_with_not_readable(self):
1529 class NotReadable(MockRawIO):
1530 def readable(self):
1531 return False
1532
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001533 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001534
1535 def test_constructor_with_not_writeable(self):
1536 class NotWriteable(MockRawIO):
1537 def writable(self):
1538 return False
1539
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001540 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001541
1542 def test_read(self):
1543 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1544
1545 self.assertEqual(pair.read(3), b"abc")
1546 self.assertEqual(pair.read(1), b"d")
1547 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001548 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1549 self.assertEqual(pair.read(None), b"abc")
1550
1551 def test_readlines(self):
1552 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1553 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1554 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1555 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001556
1557 def test_read1(self):
1558 # .read1() is delegated to the underlying reader object, so this test
1559 # can be shallow.
1560 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1561
1562 self.assertEqual(pair.read1(3), b"abc")
1563
1564 def test_readinto(self):
1565 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1566
1567 data = bytearray(5)
1568 self.assertEqual(pair.readinto(data), 5)
1569 self.assertEqual(data, b"abcde")
1570
1571 def test_write(self):
1572 w = self.MockRawIO()
1573 pair = self.tp(self.MockRawIO(), w)
1574
1575 pair.write(b"abc")
1576 pair.flush()
1577 pair.write(b"def")
1578 pair.flush()
1579 self.assertEqual(w._write_stack, [b"abc", b"def"])
1580
1581 def test_peek(self):
1582 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1583
1584 self.assertTrue(pair.peek(3).startswith(b"abc"))
1585 self.assertEqual(pair.read(3), b"abc")
1586
1587 def test_readable(self):
1588 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1589 self.assertTrue(pair.readable())
1590
1591 def test_writeable(self):
1592 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1593 self.assertTrue(pair.writable())
1594
1595 def test_seekable(self):
1596 # BufferedRWPairs are never seekable, even if their readers and writers
1597 # are.
1598 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1599 self.assertFalse(pair.seekable())
1600
1601 # .flush() is delegated to the underlying writer object and has been
1602 # tested in the test_write method.
1603
1604 def test_close_and_closed(self):
1605 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1606 self.assertFalse(pair.closed)
1607 pair.close()
1608 self.assertTrue(pair.closed)
1609
1610 def test_isatty(self):
1611 class SelectableIsAtty(MockRawIO):
1612 def __init__(self, isatty):
1613 MockRawIO.__init__(self)
1614 self._isatty = isatty
1615
1616 def isatty(self):
1617 return self._isatty
1618
1619 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1620 self.assertFalse(pair.isatty())
1621
1622 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1623 self.assertTrue(pair.isatty())
1624
1625 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1626 self.assertTrue(pair.isatty())
1627
1628 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1629 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001630
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001631 def test_weakref_clearing(self):
1632 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1633 ref = weakref.ref(brw)
1634 brw = None
1635 ref = None # Shouldn't segfault.
1636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637class CBufferedRWPairTest(BufferedRWPairTest):
1638 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640class PyBufferedRWPairTest(BufferedRWPairTest):
1641 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643
1644class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1645 read_mode = "rb+"
1646 write_mode = "wb+"
1647
1648 def test_constructor(self):
1649 BufferedReaderTest.test_constructor(self)
1650 BufferedWriterTest.test_constructor(self)
1651
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001652 def test_uninitialized(self):
1653 BufferedReaderTest.test_uninitialized(self)
1654 BufferedWriterTest.test_uninitialized(self)
1655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 def test_read_and_write(self):
1657 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001658 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001659
1660 self.assertEqual(b"as", rw.read(2))
1661 rw.write(b"ddd")
1662 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001663 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001665 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667 def test_seek_and_tell(self):
1668 raw = self.BytesIO(b"asdfghjkl")
1669 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001670
Ezio Melottib3aedd42010-11-20 19:04:17 +00001671 self.assertEqual(b"as", rw.read(2))
1672 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001673 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001674 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001675
Antoine Pitroue05565e2011-08-20 14:39:23 +02001676 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001677 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001678 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001679 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001680 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001681 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001682 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001683 self.assertEqual(7, rw.tell())
1684 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001685 rw.flush()
1686 self.assertEqual(b"asdf123fl", raw.getvalue())
1687
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001688 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 def check_flush_and_read(self, read_func):
1691 raw = self.BytesIO(b"abcdefghi")
1692 bufio = self.tp(raw)
1693
Ezio Melottib3aedd42010-11-20 19:04:17 +00001694 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(b"ef", read_func(bufio, 2))
1697 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001699 self.assertEqual(6, bufio.tell())
1700 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 raw.seek(0, 0)
1702 raw.write(b"XYZ")
1703 # flush() resets the read buffer
1704 bufio.flush()
1705 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001706 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707
1708 def test_flush_and_read(self):
1709 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1710
1711 def test_flush_and_readinto(self):
1712 def _readinto(bufio, n=-1):
1713 b = bytearray(n if n >= 0 else 9999)
1714 n = bufio.readinto(b)
1715 return bytes(b[:n])
1716 self.check_flush_and_read(_readinto)
1717
1718 def test_flush_and_peek(self):
1719 def _peek(bufio, n=-1):
1720 # This relies on the fact that the buffer can contain the whole
1721 # raw stream, otherwise peek() can return less.
1722 b = bufio.peek(n)
1723 if n != -1:
1724 b = b[:n]
1725 bufio.seek(len(b), 1)
1726 return b
1727 self.check_flush_and_read(_peek)
1728
1729 def test_flush_and_write(self):
1730 raw = self.BytesIO(b"abcdefghi")
1731 bufio = self.tp(raw)
1732
1733 bufio.write(b"123")
1734 bufio.flush()
1735 bufio.write(b"45")
1736 bufio.flush()
1737 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001738 self.assertEqual(b"12345fghi", raw.getvalue())
1739 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740
1741 def test_threads(self):
1742 BufferedReaderTest.test_threads(self)
1743 BufferedWriterTest.test_threads(self)
1744
1745 def test_writes_and_peek(self):
1746 def _peek(bufio):
1747 bufio.peek(1)
1748 self.check_writes(_peek)
1749 def _peek(bufio):
1750 pos = bufio.tell()
1751 bufio.seek(-1, 1)
1752 bufio.peek(1)
1753 bufio.seek(pos, 0)
1754 self.check_writes(_peek)
1755
1756 def test_writes_and_reads(self):
1757 def _read(bufio):
1758 bufio.seek(-1, 1)
1759 bufio.read(1)
1760 self.check_writes(_read)
1761
1762 def test_writes_and_read1s(self):
1763 def _read1(bufio):
1764 bufio.seek(-1, 1)
1765 bufio.read1(1)
1766 self.check_writes(_read1)
1767
1768 def test_writes_and_readintos(self):
1769 def _read(bufio):
1770 bufio.seek(-1, 1)
1771 bufio.readinto(bytearray(1))
1772 self.check_writes(_read)
1773
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001774 def test_write_after_readahead(self):
1775 # Issue #6629: writing after the buffer was filled by readahead should
1776 # first rewind the raw stream.
1777 for overwrite_size in [1, 5]:
1778 raw = self.BytesIO(b"A" * 10)
1779 bufio = self.tp(raw, 4)
1780 # Trigger readahead
1781 self.assertEqual(bufio.read(1), b"A")
1782 self.assertEqual(bufio.tell(), 1)
1783 # Overwriting should rewind the raw stream if it needs so
1784 bufio.write(b"B" * overwrite_size)
1785 self.assertEqual(bufio.tell(), overwrite_size + 1)
1786 # If the write size was smaller than the buffer size, flush() and
1787 # check that rewind happens.
1788 bufio.flush()
1789 self.assertEqual(bufio.tell(), overwrite_size + 1)
1790 s = raw.getvalue()
1791 self.assertEqual(s,
1792 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1793
Antoine Pitrou7c404892011-05-13 00:13:33 +02001794 def test_write_rewind_write(self):
1795 # Various combinations of reading / writing / seeking backwards / writing again
1796 def mutate(bufio, pos1, pos2):
1797 assert pos2 >= pos1
1798 # Fill the buffer
1799 bufio.seek(pos1)
1800 bufio.read(pos2 - pos1)
1801 bufio.write(b'\x02')
1802 # This writes earlier than the previous write, but still inside
1803 # the buffer.
1804 bufio.seek(pos1)
1805 bufio.write(b'\x01')
1806
1807 b = b"\x80\x81\x82\x83\x84"
1808 for i in range(0, len(b)):
1809 for j in range(i, len(b)):
1810 raw = self.BytesIO(b)
1811 bufio = self.tp(raw, 100)
1812 mutate(bufio, i, j)
1813 bufio.flush()
1814 expected = bytearray(b)
1815 expected[j] = 2
1816 expected[i] = 1
1817 self.assertEqual(raw.getvalue(), expected,
1818 "failed result for i=%d, j=%d" % (i, j))
1819
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001820 def test_truncate_after_read_or_write(self):
1821 raw = self.BytesIO(b"A" * 10)
1822 bufio = self.tp(raw, 100)
1823 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1824 self.assertEqual(bufio.truncate(), 2)
1825 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1826 self.assertEqual(bufio.truncate(), 4)
1827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 def test_misbehaved_io(self):
1829 BufferedReaderTest.test_misbehaved_io(self)
1830 BufferedWriterTest.test_misbehaved_io(self)
1831
Antoine Pitroue05565e2011-08-20 14:39:23 +02001832 def test_interleaved_read_write(self):
1833 # Test for issue #12213
1834 with self.BytesIO(b'abcdefgh') as raw:
1835 with self.tp(raw, 100) as f:
1836 f.write(b"1")
1837 self.assertEqual(f.read(1), b'b')
1838 f.write(b'2')
1839 self.assertEqual(f.read1(1), b'd')
1840 f.write(b'3')
1841 buf = bytearray(1)
1842 f.readinto(buf)
1843 self.assertEqual(buf, b'f')
1844 f.write(b'4')
1845 self.assertEqual(f.peek(1), b'h')
1846 f.flush()
1847 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1848
1849 with self.BytesIO(b'abc') as raw:
1850 with self.tp(raw, 100) as f:
1851 self.assertEqual(f.read(1), b'a')
1852 f.write(b"2")
1853 self.assertEqual(f.read(1), b'c')
1854 f.flush()
1855 self.assertEqual(raw.getvalue(), b'a2c')
1856
1857 def test_interleaved_readline_write(self):
1858 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1859 with self.tp(raw) as f:
1860 f.write(b'1')
1861 self.assertEqual(f.readline(), b'b\n')
1862 f.write(b'2')
1863 self.assertEqual(f.readline(), b'def\n')
1864 f.write(b'3')
1865 self.assertEqual(f.readline(), b'\n')
1866 f.flush()
1867 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1868
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001869 # You can't construct a BufferedRandom over a non-seekable stream.
1870 test_unseekable = None
1871
R David Murray67bfe802013-02-23 21:51:05 -05001872
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001873class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 tp = io.BufferedRandom
1875
1876 def test_constructor(self):
1877 BufferedRandomTest.test_constructor(self)
1878 # The allocation can succeed on 32-bit builds, e.g. with more
1879 # than 2GB RAM and a 64-bit kernel.
1880 if sys.maxsize > 0x7FFFFFFF:
1881 rawio = self.MockRawIO()
1882 bufio = self.tp(rawio)
1883 self.assertRaises((OverflowError, MemoryError, ValueError),
1884 bufio.__init__, rawio, sys.maxsize)
1885
1886 def test_garbage_collection(self):
1887 CBufferedReaderTest.test_garbage_collection(self)
1888 CBufferedWriterTest.test_garbage_collection(self)
1889
R David Murray67bfe802013-02-23 21:51:05 -05001890 def test_args_error(self):
1891 # Issue #17275
1892 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1893 self.tp(io.BytesIO(), 1024, 1024, 1024)
1894
1895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896class PyBufferedRandomTest(BufferedRandomTest):
1897 tp = pyio.BufferedRandom
1898
1899
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001900# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1901# properties:
1902# - A single output character can correspond to many bytes of input.
1903# - The number of input bytes to complete the character can be
1904# undetermined until the last input byte is received.
1905# - The number of input bytes can vary depending on previous input.
1906# - A single input byte can correspond to many characters of output.
1907# - The number of output characters can be undetermined until the
1908# last input byte is received.
1909# - The number of output characters can vary depending on previous input.
1910
1911class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1912 """
1913 For testing seek/tell behavior with a stateful, buffering decoder.
1914
1915 Input is a sequence of words. Words may be fixed-length (length set
1916 by input) or variable-length (period-terminated). In variable-length
1917 mode, extra periods are ignored. Possible words are:
1918 - 'i' followed by a number sets the input length, I (maximum 99).
1919 When I is set to 0, words are space-terminated.
1920 - 'o' followed by a number sets the output length, O (maximum 99).
1921 - Any other word is converted into a word followed by a period on
1922 the output. The output word consists of the input word truncated
1923 or padded out with hyphens to make its length equal to O. If O
1924 is 0, the word is output verbatim without truncating or padding.
1925 I and O are initially set to 1. When I changes, any buffered input is
1926 re-scanned according to the new I. EOF also terminates the last word.
1927 """
1928
1929 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001930 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001931 self.reset()
1932
1933 def __repr__(self):
1934 return '<SID %x>' % id(self)
1935
1936 def reset(self):
1937 self.i = 1
1938 self.o = 1
1939 self.buffer = bytearray()
1940
1941 def getstate(self):
1942 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1943 return bytes(self.buffer), i*100 + o
1944
1945 def setstate(self, state):
1946 buffer, io = state
1947 self.buffer = bytearray(buffer)
1948 i, o = divmod(io, 100)
1949 self.i, self.o = i ^ 1, o ^ 1
1950
1951 def decode(self, input, final=False):
1952 output = ''
1953 for b in input:
1954 if self.i == 0: # variable-length, terminated with period
1955 if b == ord('.'):
1956 if self.buffer:
1957 output += self.process_word()
1958 else:
1959 self.buffer.append(b)
1960 else: # fixed-length, terminate after self.i bytes
1961 self.buffer.append(b)
1962 if len(self.buffer) == self.i:
1963 output += self.process_word()
1964 if final and self.buffer: # EOF terminates the last word
1965 output += self.process_word()
1966 return output
1967
1968 def process_word(self):
1969 output = ''
1970 if self.buffer[0] == ord('i'):
1971 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1972 elif self.buffer[0] == ord('o'):
1973 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1974 else:
1975 output = self.buffer.decode('ascii')
1976 if len(output) < self.o:
1977 output += '-'*self.o # pad out with hyphens
1978 if self.o:
1979 output = output[:self.o] # truncate to output length
1980 output += '.'
1981 self.buffer = bytearray()
1982 return output
1983
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001984 codecEnabled = False
1985
1986 @classmethod
1987 def lookupTestDecoder(cls, name):
1988 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001989 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001990 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001991 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001992 incrementalencoder=None,
1993 streamreader=None, streamwriter=None,
1994 incrementaldecoder=cls)
1995
1996# Register the previous decoder for testing.
1997# Disabled by default, tests will enable it.
1998codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1999
2000
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002001class StatefulIncrementalDecoderTest(unittest.TestCase):
2002 """
2003 Make sure the StatefulIncrementalDecoder actually works.
2004 """
2005
2006 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002007 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002008 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002009 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002011 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002013 # I=0, O=6 (variable-length input, fixed-length output)
2014 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2015 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002016 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002017 # I=6, O=3 (fixed-length input > fixed-length output)
2018 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2019 # I=0, then 3; O=29, then 15 (with longer output)
2020 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2021 'a----------------------------.' +
2022 'b----------------------------.' +
2023 'cde--------------------------.' +
2024 'abcdefghijabcde.' +
2025 'a.b------------.' +
2026 '.c.------------.' +
2027 'd.e------------.' +
2028 'k--------------.' +
2029 'l--------------.' +
2030 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002031 ]
2032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002034 # Try a few one-shot test cases.
2035 for input, eof, output in self.test_cases:
2036 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002037 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038
2039 # Also test an unfinished decode, followed by forcing EOF.
2040 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002041 self.assertEqual(d.decode(b'oiabcd'), '')
2042 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002043
2044class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002045
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002046 def setUp(self):
2047 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2048 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002049 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002050
Guido van Rossumd0712812007-04-11 16:32:43 +00002051 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002052 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 def test_constructor(self):
2055 r = self.BytesIO(b"\xc3\xa9\n\n")
2056 b = self.BufferedReader(r, 1000)
2057 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002058 t.__init__(b, encoding="latin-1", newline="\r\n")
2059 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002060 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002061 t.__init__(b, encoding="utf-8", line_buffering=True)
2062 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002063 self.assertEqual(t.line_buffering, True)
2064 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002065 self.assertRaises(TypeError, t.__init__, b, newline=42)
2066 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2067
Nick Coghlana9b15242014-02-04 22:11:18 +10002068 def test_non_text_encoding_codecs_are_rejected(self):
2069 # Ensure the constructor complains if passed a codec that isn't
2070 # marked as a text encoding
2071 # http://bugs.python.org/issue20404
2072 r = self.BytesIO()
2073 b = self.BufferedWriter(r)
2074 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2075 self.TextIOWrapper(b, encoding="hex")
2076
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002077 def test_detach(self):
2078 r = self.BytesIO()
2079 b = self.BufferedWriter(r)
2080 t = self.TextIOWrapper(b)
2081 self.assertIs(t.detach(), b)
2082
2083 t = self.TextIOWrapper(b, encoding="ascii")
2084 t.write("howdy")
2085 self.assertFalse(r.getvalue())
2086 t.detach()
2087 self.assertEqual(r.getvalue(), b"howdy")
2088 self.assertRaises(ValueError, t.detach)
2089
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002090 def test_repr(self):
2091 raw = self.BytesIO("hello".encode("utf-8"))
2092 b = self.BufferedReader(raw)
2093 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002094 modname = self.TextIOWrapper.__module__
2095 self.assertEqual(repr(t),
2096 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2097 raw.name = "dummy"
2098 self.assertEqual(repr(t),
2099 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002100 t.mode = "r"
2101 self.assertEqual(repr(t),
2102 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002103 raw.name = b"dummy"
2104 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002105 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002106
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002107 def test_line_buffering(self):
2108 r = self.BytesIO()
2109 b = self.BufferedWriter(r, 1000)
2110 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002111 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002113 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002115 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002116 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002117
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002118 def test_default_encoding(self):
2119 old_environ = dict(os.environ)
2120 try:
2121 # try to get a user preferred encoding different than the current
2122 # locale encoding to check that TextIOWrapper() uses the current
2123 # locale encoding and not the user preferred encoding
2124 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2125 if key in os.environ:
2126 del os.environ[key]
2127
2128 current_locale_encoding = locale.getpreferredencoding(False)
2129 b = self.BytesIO()
2130 t = self.TextIOWrapper(b)
2131 self.assertEqual(t.encoding, current_locale_encoding)
2132 finally:
2133 os.environ.clear()
2134 os.environ.update(old_environ)
2135
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002136 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002137 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002138 # Issue 15989
2139 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002140 b = self.BytesIO()
2141 b.fileno = lambda: _testcapi.INT_MAX + 1
2142 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2143 b.fileno = lambda: _testcapi.UINT_MAX + 1
2144 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 def test_encoding(self):
2147 # Check the encoding attribute is always set, and valid
2148 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002149 t = self.TextIOWrapper(b, encoding="utf-8")
2150 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002152 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153 codecs.lookup(t.encoding)
2154
2155 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002156 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 b = self.BytesIO(b"abc\n\xff\n")
2158 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002159 self.assertRaises(UnicodeError, t.read)
2160 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 b = self.BytesIO(b"abc\n\xff\n")
2162 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002163 self.assertRaises(UnicodeError, t.read)
2164 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 b = self.BytesIO(b"abc\n\xff\n")
2166 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002168 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 b = self.BytesIO(b"abc\n\xff\n")
2170 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002174 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 b = self.BytesIO()
2176 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002177 self.assertRaises(UnicodeError, t.write, "\xff")
2178 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002179 b = self.BytesIO()
2180 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002181 self.assertRaises(UnicodeError, t.write, "\xff")
2182 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 b = self.BytesIO()
2184 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002185 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002186 t.write("abc\xffdef\n")
2187 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002188 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002189 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 b = self.BytesIO()
2191 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002192 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002193 t.write("abc\xffdef\n")
2194 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002198 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2199
2200 tests = [
2201 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002202 [ '', input_lines ],
2203 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2204 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2205 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002206 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002207 encodings = (
2208 'utf-8', 'latin-1',
2209 'utf-16', 'utf-16-le', 'utf-16-be',
2210 'utf-32', 'utf-32-le', 'utf-32-be',
2211 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002212
Guido van Rossum8358db22007-08-18 21:39:55 +00002213 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002214 # character in TextIOWrapper._pending_line.
2215 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002216 # XXX: str.encode() should return bytes
2217 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002218 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002219 for bufsize in range(1, 10):
2220 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2222 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002223 encoding=encoding)
2224 if do_reads:
2225 got_lines = []
2226 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002227 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002228 if c2 == '':
2229 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002231 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002232 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002233 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002234
2235 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(got_line, exp_line)
2237 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 def test_newlines_input(self):
2240 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002241 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2242 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002243 (None, normalized.decode("ascii").splitlines(keepends=True)),
2244 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002245 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2246 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2247 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002248 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002249 buf = self.BytesIO(testdata)
2250 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002251 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002252 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002253 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 def test_newlines_output(self):
2256 testdict = {
2257 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2258 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2259 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2260 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2261 }
2262 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2263 for newline, expected in tests:
2264 buf = self.BytesIO()
2265 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2266 txt.write("AAA\nB")
2267 txt.write("BB\nCCC\n")
2268 txt.write("X\rY\r\nZ")
2269 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002270 self.assertEqual(buf.closed, False)
2271 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272
2273 def test_destructor(self):
2274 l = []
2275 base = self.BytesIO
2276 class MyBytesIO(base):
2277 def close(self):
2278 l.append(self.getvalue())
2279 base.close(self)
2280 b = MyBytesIO()
2281 t = self.TextIOWrapper(b, encoding="ascii")
2282 t.write("abc")
2283 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002284 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002285 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286
2287 def test_override_destructor(self):
2288 record = []
2289 class MyTextIO(self.TextIOWrapper):
2290 def __del__(self):
2291 record.append(1)
2292 try:
2293 f = super().__del__
2294 except AttributeError:
2295 pass
2296 else:
2297 f()
2298 def close(self):
2299 record.append(2)
2300 super().close()
2301 def flush(self):
2302 record.append(3)
2303 super().flush()
2304 b = self.BytesIO()
2305 t = MyTextIO(b, encoding="ascii")
2306 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002307 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002308 self.assertEqual(record, [1, 2, 3])
2309
2310 def test_error_through_destructor(self):
2311 # Test that the exception state is not modified by a destructor,
2312 # even if close() fails.
2313 rawio = self.CloseFailureIO()
2314 def f():
2315 self.TextIOWrapper(rawio).xyzzy
2316 with support.captured_output("stderr") as s:
2317 self.assertRaises(AttributeError, f)
2318 s = s.getvalue().strip()
2319 if s:
2320 # The destructor *may* have printed an unraisable error, check it
2321 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002322 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002323 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002324
Guido van Rossum9b76da62007-04-11 01:09:03 +00002325 # Systematic tests of the text I/O API
2326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002328 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002329 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002331 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002332 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002333 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002334 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002335 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(f.tell(), 0)
2337 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002338 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002339 self.assertEqual(f.seek(0), 0)
2340 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002341 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002342 self.assertEqual(f.read(2), "ab")
2343 self.assertEqual(f.read(1), "c")
2344 self.assertEqual(f.read(1), "")
2345 self.assertEqual(f.read(), "")
2346 self.assertEqual(f.tell(), cookie)
2347 self.assertEqual(f.seek(0), 0)
2348 self.assertEqual(f.seek(0, 2), cookie)
2349 self.assertEqual(f.write("def"), 3)
2350 self.assertEqual(f.seek(cookie), cookie)
2351 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002352 if enc.startswith("utf"):
2353 self.multi_line_test(f, enc)
2354 f.close()
2355
2356 def multi_line_test(self, f, enc):
2357 f.seek(0)
2358 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002359 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002360 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002361 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002362 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002363 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002364 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002365 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002366 wlines.append((f.tell(), line))
2367 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002368 f.seek(0)
2369 rlines = []
2370 while True:
2371 pos = f.tell()
2372 line = f.readline()
2373 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002374 break
2375 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002376 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002379 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002380 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002381 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002382 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002383 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002384 p2 = f.tell()
2385 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(f.tell(), p0)
2387 self.assertEqual(f.readline(), "\xff\n")
2388 self.assertEqual(f.tell(), p1)
2389 self.assertEqual(f.readline(), "\xff\n")
2390 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002391 f.seek(0)
2392 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002394 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002395 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002396 f.close()
2397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398 def test_seeking(self):
2399 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002400 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002401 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002402 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002403 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002404 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002405 suffix = bytes(u_suffix.encode("utf-8"))
2406 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002407 with self.open(support.TESTFN, "wb") as f:
2408 f.write(line*2)
2409 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2410 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002411 self.assertEqual(s, str(prefix, "ascii"))
2412 self.assertEqual(f.tell(), prefix_size)
2413 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002416 # Regression test for a specific bug
2417 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002418 with self.open(support.TESTFN, "wb") as f:
2419 f.write(data)
2420 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2421 f._CHUNK_SIZE # Just test that it exists
2422 f._CHUNK_SIZE = 2
2423 f.readline()
2424 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002425
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002426 def test_seek_and_tell(self):
2427 #Test seek/tell using the StatefulIncrementalDecoder.
2428 # Make test faster by doing smaller seeks
2429 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002430
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002431 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002432 """Tell/seek to various points within a data stream and ensure
2433 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002435 f.write(data)
2436 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002437 f = self.open(support.TESTFN, encoding='test_decoder')
2438 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002439 decoded = f.read()
2440 f.close()
2441
Neal Norwitze2b07052008-03-18 19:52:05 +00002442 for i in range(min_pos, len(decoded) + 1): # seek positions
2443 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002446 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002448 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002449 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002450 f.close()
2451
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002452 # Enable the test decoder.
2453 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002454
2455 # Run the tests.
2456 try:
2457 # Try each test case.
2458 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002459 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002460
2461 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002462 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2463 offset = CHUNK_SIZE - len(input)//2
2464 prefix = b'.'*offset
2465 # Don't bother seeking into the prefix (takes too long).
2466 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002467 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002468
2469 # Ensure our test decoder won't interfere with subsequent tests.
2470 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002471 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002473 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002474 data = "1234567890"
2475 tests = ("utf-16",
2476 "utf-16-le",
2477 "utf-16-be",
2478 "utf-32",
2479 "utf-32-le",
2480 "utf-32-be")
2481 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 buf = self.BytesIO()
2483 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002484 # Check if the BOM is written only once (see issue1753).
2485 f.write(data)
2486 f.write(data)
2487 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002488 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002489 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002490 self.assertEqual(f.read(), data * 2)
2491 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002492
Benjamin Petersona1b49012009-03-31 23:11:32 +00002493 def test_unreadable(self):
2494 class UnReadable(self.BytesIO):
2495 def readable(self):
2496 return False
2497 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002498 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 def test_read_one_by_one(self):
2501 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002502 reads = ""
2503 while True:
2504 c = txt.read(1)
2505 if not c:
2506 break
2507 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002508 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002509
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002510 def test_readlines(self):
2511 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2512 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2513 txt.seek(0)
2514 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2515 txt.seek(0)
2516 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2517
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002518 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002520 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002521 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002522 reads = ""
2523 while True:
2524 c = txt.read(128)
2525 if not c:
2526 break
2527 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002528 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002529
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002530 def test_writelines(self):
2531 l = ['ab', 'cd', 'ef']
2532 buf = self.BytesIO()
2533 txt = self.TextIOWrapper(buf)
2534 txt.writelines(l)
2535 txt.flush()
2536 self.assertEqual(buf.getvalue(), b'abcdef')
2537
2538 def test_writelines_userlist(self):
2539 l = UserList(['ab', 'cd', 'ef'])
2540 buf = self.BytesIO()
2541 txt = self.TextIOWrapper(buf)
2542 txt.writelines(l)
2543 txt.flush()
2544 self.assertEqual(buf.getvalue(), b'abcdef')
2545
2546 def test_writelines_error(self):
2547 txt = self.TextIOWrapper(self.BytesIO())
2548 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2549 self.assertRaises(TypeError, txt.writelines, None)
2550 self.assertRaises(TypeError, txt.writelines, b'abc')
2551
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002552 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002553 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002554
2555 # read one char at a time
2556 reads = ""
2557 while True:
2558 c = txt.read(1)
2559 if not c:
2560 break
2561 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002562 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002563
2564 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002566 txt._CHUNK_SIZE = 4
2567
2568 reads = ""
2569 while True:
2570 c = txt.read(4)
2571 if not c:
2572 break
2573 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002575
2576 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002577 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002578 txt._CHUNK_SIZE = 4
2579
2580 reads = txt.read(4)
2581 reads += txt.read(4)
2582 reads += txt.readline()
2583 reads += txt.readline()
2584 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002585 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002586
2587 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002588 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002589 txt._CHUNK_SIZE = 4
2590
2591 reads = txt.read(4)
2592 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002593 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594
2595 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002597 txt._CHUNK_SIZE = 4
2598
2599 reads = txt.read(4)
2600 pos = txt.tell()
2601 txt.seek(0)
2602 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002604
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002605 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606 buffer = self.BytesIO(self.testdata)
2607 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002608
2609 self.assertEqual(buffer.seekable(), txt.seekable())
2610
Antoine Pitroue4501852009-05-14 18:55:55 +00002611 def test_append_bom(self):
2612 # The BOM is not written again when appending to a non-empty file
2613 filename = support.TESTFN
2614 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2615 with self.open(filename, 'w', encoding=charset) as f:
2616 f.write('aaa')
2617 pos = f.tell()
2618 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002619 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002620
2621 with self.open(filename, 'a', encoding=charset) as f:
2622 f.write('xxx')
2623 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002625
2626 def test_seek_bom(self):
2627 # Same test, but when seeking manually
2628 filename = support.TESTFN
2629 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2630 with self.open(filename, 'w', encoding=charset) as f:
2631 f.write('aaa')
2632 pos = f.tell()
2633 with self.open(filename, 'r+', encoding=charset) as f:
2634 f.seek(pos)
2635 f.write('zzz')
2636 f.seek(0)
2637 f.write('bbb')
2638 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002639 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002640
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002641 def test_errors_property(self):
2642 with self.open(support.TESTFN, "w") as f:
2643 self.assertEqual(f.errors, "strict")
2644 with self.open(support.TESTFN, "w", errors="replace") as f:
2645 self.assertEqual(f.errors, "replace")
2646
Brett Cannon31f59292011-02-21 19:29:56 +00002647 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002648 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002649 def test_threads_write(self):
2650 # Issue6750: concurrent writes could duplicate data
2651 event = threading.Event()
2652 with self.open(support.TESTFN, "w", buffering=1) as f:
2653 def run(n):
2654 text = "Thread%03d\n" % n
2655 event.wait()
2656 f.write(text)
2657 threads = [threading.Thread(target=lambda n=x: run(n))
2658 for x in range(20)]
2659 for t in threads:
2660 t.start()
2661 time.sleep(0.02)
2662 event.set()
2663 for t in threads:
2664 t.join()
2665 with self.open(support.TESTFN) as f:
2666 content = f.read()
2667 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002668 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002669
Antoine Pitrou6be88762010-05-03 16:48:20 +00002670 def test_flush_error_on_close(self):
2671 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2672 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002673 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002674 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002675 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002676 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002677
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002678 def test_close_error_on_close(self):
2679 buffer = self.BytesIO(self.testdata)
2680 def bad_flush():
2681 raise OSError('flush')
2682 def bad_close():
2683 raise OSError('close')
2684 buffer.close = bad_close
2685 txt = self.TextIOWrapper(buffer, encoding="ascii")
2686 txt.flush = bad_flush
2687 with self.assertRaises(OSError) as err: # exception not swallowed
2688 txt.close()
2689 self.assertEqual(err.exception.args, ('close',))
2690 self.assertIsInstance(err.exception.__context__, OSError)
2691 self.assertEqual(err.exception.__context__.args, ('flush',))
2692 self.assertFalse(txt.closed)
2693
2694 def test_nonnormalized_close_error_on_close(self):
2695 # Issue #21677
2696 buffer = self.BytesIO(self.testdata)
2697 def bad_flush():
2698 raise non_existing_flush
2699 def bad_close():
2700 raise non_existing_close
2701 buffer.close = bad_close
2702 txt = self.TextIOWrapper(buffer, encoding="ascii")
2703 txt.flush = bad_flush
2704 with self.assertRaises(NameError) as err: # exception not swallowed
2705 txt.close()
2706 self.assertIn('non_existing_close', str(err.exception))
2707 self.assertIsInstance(err.exception.__context__, NameError)
2708 self.assertIn('non_existing_flush', str(err.exception.__context__))
2709 self.assertFalse(txt.closed)
2710
Antoine Pitrou6be88762010-05-03 16:48:20 +00002711 def test_multi_close(self):
2712 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2713 txt.close()
2714 txt.close()
2715 txt.close()
2716 self.assertRaises(ValueError, txt.flush)
2717
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002718 def test_unseekable(self):
2719 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2720 self.assertRaises(self.UnsupportedOperation, txt.tell)
2721 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2722
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002723 def test_readonly_attributes(self):
2724 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2725 buf = self.BytesIO(self.testdata)
2726 with self.assertRaises(AttributeError):
2727 txt.buffer = buf
2728
Antoine Pitroue96ec682011-07-23 21:46:35 +02002729 def test_rawio(self):
2730 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2731 # that subprocess.Popen() can have the required unbuffered
2732 # semantics with universal_newlines=True.
2733 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2734 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2735 # Reads
2736 self.assertEqual(txt.read(4), 'abcd')
2737 self.assertEqual(txt.readline(), 'efghi\n')
2738 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2739
2740 def test_rawio_write_through(self):
2741 # Issue #12591: with write_through=True, writes don't need a flush
2742 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2743 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2744 write_through=True)
2745 txt.write('1')
2746 txt.write('23\n4')
2747 txt.write('5')
2748 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2749
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002750 def test_bufio_write_through(self):
2751 # Issue #21396: write_through=True doesn't force a flush()
2752 # on the underlying binary buffered object.
2753 flush_called, write_called = [], []
2754 class BufferedWriter(self.BufferedWriter):
2755 def flush(self, *args, **kwargs):
2756 flush_called.append(True)
2757 return super().flush(*args, **kwargs)
2758 def write(self, *args, **kwargs):
2759 write_called.append(True)
2760 return super().write(*args, **kwargs)
2761
2762 rawio = self.BytesIO()
2763 data = b"a"
2764 bufio = BufferedWriter(rawio, len(data)*2)
2765 textio = self.TextIOWrapper(bufio, encoding='ascii',
2766 write_through=True)
2767 # write to the buffered io but don't overflow the buffer
2768 text = data.decode('ascii')
2769 textio.write(text)
2770
2771 # buffer.flush is not called with write_through=True
2772 self.assertFalse(flush_called)
2773 # buffer.write *is* called with write_through=True
2774 self.assertTrue(write_called)
2775 self.assertEqual(rawio.getvalue(), b"") # no flush
2776
2777 write_called = [] # reset
2778 textio.write(text * 10) # total content is larger than bufio buffer
2779 self.assertTrue(write_called)
2780 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2781
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002782 def test_read_nonbytes(self):
2783 # Issue #17106
2784 # Crash when underlying read() returns non-bytes
2785 t = self.TextIOWrapper(self.StringIO('a'))
2786 self.assertRaises(TypeError, t.read, 1)
2787 t = self.TextIOWrapper(self.StringIO('a'))
2788 self.assertRaises(TypeError, t.readline)
2789 t = self.TextIOWrapper(self.StringIO('a'))
2790 self.assertRaises(TypeError, t.read)
2791
2792 def test_illegal_decoder(self):
2793 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002794 # Bypass the early encoding check added in issue 20404
2795 def _make_illegal_wrapper():
2796 quopri = codecs.lookup("quopri")
2797 quopri._is_text_encoding = True
2798 try:
2799 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2800 newline='\n', encoding="quopri")
2801 finally:
2802 quopri._is_text_encoding = False
2803 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002804 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002805 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002806 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002807 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002808 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002809 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002810 self.assertRaises(TypeError, t.read)
2811
Antoine Pitrou712cb732013-12-21 15:51:54 +01002812 def _check_create_at_shutdown(self, **kwargs):
2813 # Issue #20037: creating a TextIOWrapper at shutdown
2814 # shouldn't crash the interpreter.
2815 iomod = self.io.__name__
2816 code = """if 1:
2817 import codecs
2818 import {iomod} as io
2819
2820 # Avoid looking up codecs at shutdown
2821 codecs.lookup('utf-8')
2822
2823 class C:
2824 def __init__(self):
2825 self.buf = io.BytesIO()
2826 def __del__(self):
2827 io.TextIOWrapper(self.buf, **{kwargs})
2828 print("ok")
2829 c = C()
2830 """.format(iomod=iomod, kwargs=kwargs)
2831 return assert_python_ok("-c", code)
2832
2833 def test_create_at_shutdown_without_encoding(self):
2834 rc, out, err = self._check_create_at_shutdown()
2835 if err:
2836 # Can error out with a RuntimeError if the module state
2837 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002838 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002839 else:
2840 self.assertEqual("ok", out.decode().strip())
2841
2842 def test_create_at_shutdown_with_encoding(self):
2843 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2844 errors='strict')
2845 self.assertFalse(err)
2846 self.assertEqual("ok", out.decode().strip())
2847
Antoine Pitroub8503892014-04-29 10:14:02 +02002848 def test_read_byteslike(self):
2849 r = MemviewBytesIO(b'Just some random string\n')
2850 t = self.TextIOWrapper(r, 'utf-8')
2851
2852 # TextIOwrapper will not read the full string, because
2853 # we truncate it to a multiple of the native int size
2854 # so that we can construct a more complex memoryview.
2855 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2856
2857 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2858
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002859 def test_issue22849(self):
2860 class F(object):
2861 def readable(self): return True
2862 def writable(self): return True
2863 def seekable(self): return True
2864
2865 for i in range(10):
2866 try:
2867 self.TextIOWrapper(F(), encoding='utf-8')
2868 except Exception:
2869 pass
2870
2871 F.tell = lambda x: 0
2872 t = self.TextIOWrapper(F(), encoding='utf-8')
2873
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002874
Antoine Pitroub8503892014-04-29 10:14:02 +02002875class MemviewBytesIO(io.BytesIO):
2876 '''A BytesIO object whose read method returns memoryviews
2877 rather than bytes'''
2878
2879 def read1(self, len_):
2880 return _to_memoryview(super().read1(len_))
2881
2882 def read(self, len_):
2883 return _to_memoryview(super().read(len_))
2884
2885def _to_memoryview(buf):
2886 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2887
2888 arr = array.array('i')
2889 idx = len(buf) - len(buf) % arr.itemsize
2890 arr.frombytes(buf[:idx])
2891 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002892
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05002893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002894class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002895 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002896 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002897
2898 def test_initialization(self):
2899 r = self.BytesIO(b"\xc3\xa9\n\n")
2900 b = self.BufferedReader(r, 1000)
2901 t = self.TextIOWrapper(b)
2902 self.assertRaises(TypeError, t.__init__, b, newline=42)
2903 self.assertRaises(ValueError, t.read)
2904 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2905 self.assertRaises(ValueError, t.read)
2906
2907 def test_garbage_collection(self):
2908 # C TextIOWrapper objects are collected, and collecting them flushes
2909 # all data to disk.
2910 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002911 with support.check_warnings(('', ResourceWarning)):
2912 rawio = io.FileIO(support.TESTFN, "wb")
2913 b = self.BufferedWriter(rawio)
2914 t = self.TextIOWrapper(b, encoding="ascii")
2915 t.write("456def")
2916 t.x = t
2917 wr = weakref.ref(t)
2918 del t
2919 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002920 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002921 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002922 self.assertEqual(f.read(), b"456def")
2923
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002924 def test_rwpair_cleared_before_textio(self):
2925 # Issue 13070: TextIOWrapper's finalization would crash when called
2926 # after the reference to the underlying BufferedRWPair's writer got
2927 # cleared by the GC.
2928 for i in range(1000):
2929 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2930 t1 = self.TextIOWrapper(b1, encoding="ascii")
2931 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2932 t2 = self.TextIOWrapper(b2, encoding="ascii")
2933 # circular references
2934 t1.buddy = t2
2935 t2.buddy = t1
2936 support.gc_collect()
2937
2938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002939class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002940 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002941 #shutdown_error = "LookupError: unknown encoding: ascii"
2942 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002943
2944
2945class IncrementalNewlineDecoderTest(unittest.TestCase):
2946
2947 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002948 # UTF-8 specific tests for a newline decoder
2949 def _check_decode(b, s, **kwargs):
2950 # We exercise getstate() / setstate() as well as decode()
2951 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002952 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002953 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002954 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002955
Antoine Pitrou180a3362008-12-14 16:36:46 +00002956 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002957
Antoine Pitrou180a3362008-12-14 16:36:46 +00002958 _check_decode(b'\xe8', "")
2959 _check_decode(b'\xa2', "")
2960 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002961
Antoine Pitrou180a3362008-12-14 16:36:46 +00002962 _check_decode(b'\xe8', "")
2963 _check_decode(b'\xa2', "")
2964 _check_decode(b'\x88', "\u8888")
2965
2966 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002967 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2968
Antoine Pitrou180a3362008-12-14 16:36:46 +00002969 decoder.reset()
2970 _check_decode(b'\n', "\n")
2971 _check_decode(b'\r', "")
2972 _check_decode(b'', "\n", final=True)
2973 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002974
Antoine Pitrou180a3362008-12-14 16:36:46 +00002975 _check_decode(b'\r', "")
2976 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002977
Antoine Pitrou180a3362008-12-14 16:36:46 +00002978 _check_decode(b'\r\r\n', "\n\n")
2979 _check_decode(b'\r', "")
2980 _check_decode(b'\r', "\n")
2981 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002982
Antoine Pitrou180a3362008-12-14 16:36:46 +00002983 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2984 _check_decode(b'\xe8\xa2\x88', "\u8888")
2985 _check_decode(b'\n', "\n")
2986 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2987 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002990 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 if encoding is not None:
2992 encoder = codecs.getincrementalencoder(encoding)()
2993 def _decode_bytewise(s):
2994 # Decode one byte at a time
2995 for b in encoder.encode(s):
2996 result.append(decoder.decode(bytes([b])))
2997 else:
2998 encoder = None
2999 def _decode_bytewise(s):
3000 # Decode one char at a time
3001 for c in s:
3002 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003003 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003004 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003005 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003006 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003007 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003008 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003009 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003010 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003011 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003012 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003013 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003014 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003015 input = "abc"
3016 if encoder is not None:
3017 encoder.reset()
3018 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(decoder.decode(input), "abc")
3020 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003021
3022 def test_newline_decoder(self):
3023 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003024 # None meaning the IncrementalNewlineDecoder takes unicode input
3025 # rather than bytes input
3026 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003027 'utf-16', 'utf-16-le', 'utf-16-be',
3028 'utf-32', 'utf-32-le', 'utf-32-be',
3029 )
3030 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031 decoder = enc and codecs.getincrementaldecoder(enc)()
3032 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3033 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003034 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003035 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3036 self.check_newline_decoding_utf8(decoder)
3037
Antoine Pitrou66913e22009-03-06 23:40:56 +00003038 def test_newline_bytes(self):
3039 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3040 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003041 self.assertEqual(dec.newlines, None)
3042 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3043 self.assertEqual(dec.newlines, None)
3044 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3045 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003046 dec = self.IncrementalNewlineDecoder(None, translate=False)
3047 _check(dec)
3048 dec = self.IncrementalNewlineDecoder(None, translate=True)
3049 _check(dec)
3050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003051class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3052 pass
3053
3054class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3055 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003056
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003057
Guido van Rossum01a27522007-03-07 01:00:12 +00003058# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003059
Guido van Rossum5abbf752007-08-27 17:39:33 +00003060class MiscIOTest(unittest.TestCase):
3061
Barry Warsaw40e82462008-11-20 20:14:50 +00003062 def tearDown(self):
3063 support.unlink(support.TESTFN)
3064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065 def test___all__(self):
3066 for name in self.io.__all__:
3067 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003068 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003069 if name == "open":
3070 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003071 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003072 self.assertTrue(issubclass(obj, Exception), name)
3073 elif not name.startswith("SEEK_"):
3074 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003075
Barry Warsaw40e82462008-11-20 20:14:50 +00003076 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003077 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003078 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003079 f.close()
3080
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003081 with support.check_warnings(('', DeprecationWarning)):
3082 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003083 self.assertEqual(f.name, support.TESTFN)
3084 self.assertEqual(f.buffer.name, support.TESTFN)
3085 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3086 self.assertEqual(f.mode, "U")
3087 self.assertEqual(f.buffer.mode, "rb")
3088 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003089 f.close()
3090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003091 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003092 self.assertEqual(f.mode, "w+")
3093 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3094 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003096 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003097 self.assertEqual(g.mode, "wb")
3098 self.assertEqual(g.raw.mode, "wb")
3099 self.assertEqual(g.name, f.fileno())
3100 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003101 f.close()
3102 g.close()
3103
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003104 def test_io_after_close(self):
3105 for kwargs in [
3106 {"mode": "w"},
3107 {"mode": "wb"},
3108 {"mode": "w", "buffering": 1},
3109 {"mode": "w", "buffering": 2},
3110 {"mode": "wb", "buffering": 0},
3111 {"mode": "r"},
3112 {"mode": "rb"},
3113 {"mode": "r", "buffering": 1},
3114 {"mode": "r", "buffering": 2},
3115 {"mode": "rb", "buffering": 0},
3116 {"mode": "w+"},
3117 {"mode": "w+b"},
3118 {"mode": "w+", "buffering": 1},
3119 {"mode": "w+", "buffering": 2},
3120 {"mode": "w+b", "buffering": 0},
3121 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003122 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003123 f.close()
3124 self.assertRaises(ValueError, f.flush)
3125 self.assertRaises(ValueError, f.fileno)
3126 self.assertRaises(ValueError, f.isatty)
3127 self.assertRaises(ValueError, f.__iter__)
3128 if hasattr(f, "peek"):
3129 self.assertRaises(ValueError, f.peek, 1)
3130 self.assertRaises(ValueError, f.read)
3131 if hasattr(f, "read1"):
3132 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003133 if hasattr(f, "readall"):
3134 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003135 if hasattr(f, "readinto"):
3136 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003137 if hasattr(f, "readinto1"):
3138 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003139 self.assertRaises(ValueError, f.readline)
3140 self.assertRaises(ValueError, f.readlines)
3141 self.assertRaises(ValueError, f.seek, 0)
3142 self.assertRaises(ValueError, f.tell)
3143 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003144 self.assertRaises(ValueError, f.write,
3145 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003146 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003147 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003149 def test_blockingioerror(self):
3150 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003151 class C(str):
3152 pass
3153 c = C("")
3154 b = self.BlockingIOError(1, c)
3155 c.b = b
3156 b.c = c
3157 wr = weakref.ref(c)
3158 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003159 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003160 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161
3162 def test_abcs(self):
3163 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003164 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3165 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3166 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3167 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003168
3169 def _check_abc_inheritance(self, abcmodule):
3170 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003171 self.assertIsInstance(f, abcmodule.IOBase)
3172 self.assertIsInstance(f, abcmodule.RawIOBase)
3173 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3174 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003176 self.assertIsInstance(f, abcmodule.IOBase)
3177 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3178 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3179 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003180 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003181 self.assertIsInstance(f, abcmodule.IOBase)
3182 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3183 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3184 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003185
3186 def test_abc_inheritance(self):
3187 # Test implementations inherit from their respective ABCs
3188 self._check_abc_inheritance(self)
3189
3190 def test_abc_inheritance_official(self):
3191 # Test implementations inherit from the official ABCs of the
3192 # baseline "io" module.
3193 self._check_abc_inheritance(io)
3194
Antoine Pitroue033e062010-10-29 10:38:18 +00003195 def _check_warn_on_dealloc(self, *args, **kwargs):
3196 f = open(*args, **kwargs)
3197 r = repr(f)
3198 with self.assertWarns(ResourceWarning) as cm:
3199 f = None
3200 support.gc_collect()
3201 self.assertIn(r, str(cm.warning.args[0]))
3202
3203 def test_warn_on_dealloc(self):
3204 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3205 self._check_warn_on_dealloc(support.TESTFN, "wb")
3206 self._check_warn_on_dealloc(support.TESTFN, "w")
3207
3208 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3209 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003210 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003211 for fd in fds:
3212 try:
3213 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003214 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003215 if e.errno != errno.EBADF:
3216 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003217 self.addCleanup(cleanup_fds)
3218 r, w = os.pipe()
3219 fds += r, w
3220 self._check_warn_on_dealloc(r, *args, **kwargs)
3221 # When using closefd=False, there's no warning
3222 r, w = os.pipe()
3223 fds += r, w
3224 with warnings.catch_warnings(record=True) as recorded:
3225 open(r, *args, closefd=False, **kwargs)
3226 support.gc_collect()
3227 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003228
3229 def test_warn_on_dealloc_fd(self):
3230 self._check_warn_on_dealloc_fd("rb", buffering=0)
3231 self._check_warn_on_dealloc_fd("rb")
3232 self._check_warn_on_dealloc_fd("r")
3233
3234
Antoine Pitrou243757e2010-11-05 21:15:39 +00003235 def test_pickling(self):
3236 # Pickling file objects is forbidden
3237 for kwargs in [
3238 {"mode": "w"},
3239 {"mode": "wb"},
3240 {"mode": "wb", "buffering": 0},
3241 {"mode": "r"},
3242 {"mode": "rb"},
3243 {"mode": "rb", "buffering": 0},
3244 {"mode": "w+"},
3245 {"mode": "w+b"},
3246 {"mode": "w+b", "buffering": 0},
3247 ]:
3248 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3249 with self.open(support.TESTFN, **kwargs) as f:
3250 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3251
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003252 def test_nonblock_pipe_write_bigbuf(self):
3253 self._test_nonblock_pipe_write(16*1024)
3254
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003255 def test_nonblock_pipe_write_smallbuf(self):
3256 self._test_nonblock_pipe_write(1024)
3257
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003258 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3259 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003260 def _test_nonblock_pipe_write(self, bufsize):
3261 sent = []
3262 received = []
3263 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003264 os.set_blocking(r, False)
3265 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003266
3267 # To exercise all code paths in the C implementation we need
3268 # to play with buffer sizes. For instance, if we choose a
3269 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3270 # then we will never get a partial write of the buffer.
3271 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3272 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3273
3274 with rf, wf:
3275 for N in 9999, 73, 7574:
3276 try:
3277 i = 0
3278 while True:
3279 msg = bytes([i % 26 + 97]) * N
3280 sent.append(msg)
3281 wf.write(msg)
3282 i += 1
3283
3284 except self.BlockingIOError as e:
3285 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003286 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003287 sent[-1] = sent[-1][:e.characters_written]
3288 received.append(rf.read())
3289 msg = b'BLOCKED'
3290 wf.write(msg)
3291 sent.append(msg)
3292
3293 while True:
3294 try:
3295 wf.flush()
3296 break
3297 except self.BlockingIOError as e:
3298 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003299 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003300 self.assertEqual(e.characters_written, 0)
3301 received.append(rf.read())
3302
3303 received += iter(rf.read, None)
3304
3305 sent, received = b''.join(sent), b''.join(received)
3306 self.assertTrue(sent == received)
3307 self.assertTrue(wf.closed)
3308 self.assertTrue(rf.closed)
3309
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003310 def test_create_fail(self):
3311 # 'x' mode fails if file is existing
3312 with self.open(support.TESTFN, 'w'):
3313 pass
3314 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3315
3316 def test_create_writes(self):
3317 # 'x' mode opens for writing
3318 with self.open(support.TESTFN, 'xb') as f:
3319 f.write(b"spam")
3320 with self.open(support.TESTFN, 'rb') as f:
3321 self.assertEqual(b"spam", f.read())
3322
Christian Heimes7b648752012-09-10 14:48:43 +02003323 def test_open_allargs(self):
3324 # there used to be a buffer overflow in the parser for rawmode
3325 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3326
3327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003328class CMiscIOTest(MiscIOTest):
3329 io = io
3330
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003331 def test_readinto_buffer_overflow(self):
3332 # Issue #18025
3333 class BadReader(self.io.BufferedIOBase):
3334 def read(self, n=-1):
3335 return b'x' * 10**6
3336 bufio = BadReader()
3337 b = bytearray(2)
3338 self.assertRaises(ValueError, bufio.readinto, b)
3339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003340class PyMiscIOTest(MiscIOTest):
3341 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003342
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003343
3344@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3345class SignalsTest(unittest.TestCase):
3346
3347 def setUp(self):
3348 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3349
3350 def tearDown(self):
3351 signal.signal(signal.SIGALRM, self.oldalrm)
3352
3353 def alarm_interrupt(self, sig, frame):
3354 1/0
3355
3356 @unittest.skipUnless(threading, 'Threading required for this test.')
3357 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3358 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003359 invokes the signal handler, and bubbles up the exception raised
3360 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003361 read_results = []
3362 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003363 if hasattr(signal, 'pthread_sigmask'):
3364 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003365 s = os.read(r, 1)
3366 read_results.append(s)
3367 t = threading.Thread(target=_read)
3368 t.daemon = True
3369 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003370 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003371 try:
3372 wio = self.io.open(w, **fdopen_kwargs)
3373 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003374 # Fill the pipe enough that the write will be blocking.
3375 # It will be interrupted by the timer armed above. Since the
3376 # other thread has read one byte, the low-level write will
3377 # return with a successful (partial) result rather than an EINTR.
3378 # The buffered IO layer must check for pending signal
3379 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003380 signal.alarm(1)
3381 try:
3382 self.assertRaises(ZeroDivisionError,
3383 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3384 finally:
3385 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003386 t.join()
3387 # We got one byte, get another one and check that it isn't a
3388 # repeat of the first one.
3389 read_results.append(os.read(r, 1))
3390 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3391 finally:
3392 os.close(w)
3393 os.close(r)
3394 # This is deliberate. If we didn't close the file descriptor
3395 # before closing wio, wio would try to flush its internal
3396 # buffer, and block again.
3397 try:
3398 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003399 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003400 if e.errno != errno.EBADF:
3401 raise
3402
3403 def test_interrupted_write_unbuffered(self):
3404 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3405
3406 def test_interrupted_write_buffered(self):
3407 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3408
Victor Stinner6ab72862014-09-03 23:32:28 +02003409 # Issue #22331: The test hangs on FreeBSD 7.2
3410 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003411 def test_interrupted_write_text(self):
3412 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3413
Brett Cannon31f59292011-02-21 19:29:56 +00003414 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003415 def check_reentrant_write(self, data, **fdopen_kwargs):
3416 def on_alarm(*args):
3417 # Will be called reentrantly from the same thread
3418 wio.write(data)
3419 1/0
3420 signal.signal(signal.SIGALRM, on_alarm)
3421 r, w = os.pipe()
3422 wio = self.io.open(w, **fdopen_kwargs)
3423 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003424 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003425 # Either the reentrant call to wio.write() fails with RuntimeError,
3426 # or the signal handler raises ZeroDivisionError.
3427 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3428 while 1:
3429 for i in range(100):
3430 wio.write(data)
3431 wio.flush()
3432 # Make sure the buffer doesn't fill up and block further writes
3433 os.read(r, len(data) * 100)
3434 exc = cm.exception
3435 if isinstance(exc, RuntimeError):
3436 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3437 finally:
3438 wio.close()
3439 os.close(r)
3440
3441 def test_reentrant_write_buffered(self):
3442 self.check_reentrant_write(b"xy", mode="wb")
3443
3444 def test_reentrant_write_text(self):
3445 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3446
Antoine Pitrou707ce822011-02-25 21:24:11 +00003447 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3448 """Check that a buffered read, when it gets interrupted (either
3449 returning a partial result or EINTR), properly invokes the signal
3450 handler and retries if the latter returned successfully."""
3451 r, w = os.pipe()
3452 fdopen_kwargs["closefd"] = False
3453 def alarm_handler(sig, frame):
3454 os.write(w, b"bar")
3455 signal.signal(signal.SIGALRM, alarm_handler)
3456 try:
3457 rio = self.io.open(r, **fdopen_kwargs)
3458 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003459 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003460 # Expected behaviour:
3461 # - first raw read() returns partial b"foo"
3462 # - second raw read() returns EINTR
3463 # - third raw read() returns b"bar"
3464 self.assertEqual(decode(rio.read(6)), "foobar")
3465 finally:
3466 rio.close()
3467 os.close(w)
3468 os.close(r)
3469
Antoine Pitrou20db5112011-08-19 20:32:34 +02003470 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003471 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3472 mode="rb")
3473
Antoine Pitrou20db5112011-08-19 20:32:34 +02003474 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003475 self.check_interrupted_read_retry(lambda x: x,
3476 mode="r")
3477
3478 @unittest.skipUnless(threading, 'Threading required for this test.')
3479 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3480 """Check that a buffered write, when it gets interrupted (either
3481 returning a partial result or EINTR), properly invokes the signal
3482 handler and retries if the latter returned successfully."""
3483 select = support.import_module("select")
3484 # A quantity that exceeds the buffer size of an anonymous pipe's
3485 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003486 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003487 r, w = os.pipe()
3488 fdopen_kwargs["closefd"] = False
3489 # We need a separate thread to read from the pipe and allow the
3490 # write() to finish. This thread is started after the SIGALRM is
3491 # received (forcing a first EINTR in write()).
3492 read_results = []
3493 write_finished = False
3494 def _read():
3495 while not write_finished:
3496 while r in select.select([r], [], [], 1.0)[0]:
3497 s = os.read(r, 1024)
3498 read_results.append(s)
3499 t = threading.Thread(target=_read)
3500 t.daemon = True
3501 def alarm1(sig, frame):
3502 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003503 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003504 def alarm2(sig, frame):
3505 t.start()
3506 signal.signal(signal.SIGALRM, alarm1)
3507 try:
3508 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003509 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003510 # Expected behaviour:
3511 # - first raw write() is partial (because of the limited pipe buffer
3512 # and the first alarm)
3513 # - second raw write() returns EINTR (because of the second alarm)
3514 # - subsequent write()s are successful (either partial or complete)
3515 self.assertEqual(N, wio.write(item * N))
3516 wio.flush()
3517 write_finished = True
3518 t.join()
3519 self.assertEqual(N, sum(len(x) for x in read_results))
3520 finally:
3521 write_finished = True
3522 os.close(w)
3523 os.close(r)
3524 # This is deliberate. If we didn't close the file descriptor
3525 # before closing wio, wio would try to flush its internal
3526 # buffer, and could block (in case of failure).
3527 try:
3528 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003529 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003530 if e.errno != errno.EBADF:
3531 raise
3532
Antoine Pitrou20db5112011-08-19 20:32:34 +02003533 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003534 self.check_interrupted_write_retry(b"x", mode="wb")
3535
Antoine Pitrou20db5112011-08-19 20:32:34 +02003536 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003537 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3538
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003539
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003540class CSignalsTest(SignalsTest):
3541 io = io
3542
3543class PySignalsTest(SignalsTest):
3544 io = pyio
3545
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003546 # Handling reentrancy issues would slow down _pyio even more, so the
3547 # tests are disabled.
3548 test_reentrant_write_buffered = None
3549 test_reentrant_write_text = None
3550
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003551
Ezio Melottidaa42c72013-03-23 16:30:16 +02003552def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003553 tests = (CIOTest, PyIOTest,
3554 CBufferedReaderTest, PyBufferedReaderTest,
3555 CBufferedWriterTest, PyBufferedWriterTest,
3556 CBufferedRWPairTest, PyBufferedRWPairTest,
3557 CBufferedRandomTest, PyBufferedRandomTest,
3558 StatefulIncrementalDecoderTest,
3559 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3560 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003561 CMiscIOTest, PyMiscIOTest,
3562 CSignalsTest, PySignalsTest,
3563 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003564
3565 # Put the namespaces of the IO module we are testing and some useful mock
3566 # classes in the __dict__ of each test.
3567 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003568 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003569 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3570 c_io_ns = {name : getattr(io, name) for name in all_members}
3571 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3572 globs = globals()
3573 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3574 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3575 # Avoid turning open into a bound method.
3576 py_io_ns["open"] = pyio.OpenWrapper
3577 for test in tests:
3578 if test.__name__.startswith("C"):
3579 for name, obj in c_io_ns.items():
3580 setattr(test, name, obj)
3581 elif test.__name__.startswith("Py"):
3582 for name, obj in py_io_ns.items():
3583 setattr(test, name, obj)
3584
Ezio Melottidaa42c72013-03-23 16:30:16 +02003585 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3586 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003587
3588if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003589 unittest.main()