blob: c867502e9bda27cc74f3798de5b6a7b36e28548c [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Antoine Pitrou6be88762010-05-03 16:48:20 +0000592 def test_flush_error_on_close(self):
593 f = self.open(support.TESTFN, "wb", buffering=0)
594 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200595 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200597 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600598 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599
600 def test_multi_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 f.close()
603 f.close()
604 f.close()
605 self.assertRaises(ValueError, f.flush)
606
Antoine Pitrou328ec742010-09-14 18:37:24 +0000607 def test_RawIOBase_read(self):
608 # Exercise the default RawIOBase.read() implementation (which calls
609 # readinto() internally).
610 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
611 self.assertEqual(rawio.read(2), b"ab")
612 self.assertEqual(rawio.read(2), b"c")
613 self.assertEqual(rawio.read(2), b"d")
614 self.assertEqual(rawio.read(2), None)
615 self.assertEqual(rawio.read(2), b"ef")
616 self.assertEqual(rawio.read(2), b"g")
617 self.assertEqual(rawio.read(2), None)
618 self.assertEqual(rawio.read(2), b"")
619
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400620 def test_types_have_dict(self):
621 test = (
622 self.IOBase(),
623 self.RawIOBase(),
624 self.TextIOBase(),
625 self.StringIO(),
626 self.BytesIO()
627 )
628 for obj in test:
629 self.assertTrue(hasattr(obj, "__dict__"))
630
Ross Lagerwall59142db2011-10-31 20:34:46 +0200631 def test_opener(self):
632 with self.open(support.TESTFN, "w") as f:
633 f.write("egg\n")
634 fd = os.open(support.TESTFN, os.O_RDONLY)
635 def opener(path, flags):
636 return fd
637 with self.open("non-existent", "r", opener=opener) as f:
638 self.assertEqual(f.read(), "egg\n")
639
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200640 def test_fileio_closefd(self):
641 # Issue #4841
642 with self.open(__file__, 'rb') as f1, \
643 self.open(__file__, 'rb') as f2:
644 fileio = self.FileIO(f1.fileno(), closefd=False)
645 # .__init__() must not close f1
646 fileio.__init__(f2.fileno(), closefd=False)
647 f1.readline()
648 # .close() must not close f2
649 fileio.close()
650 f2.readline()
651
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300652 def test_nonbuffered_textio(self):
653 with warnings.catch_warnings(record=True) as recorded:
654 with self.assertRaises(ValueError):
655 self.open(support.TESTFN, 'w', buffering=0)
656 support.gc_collect()
657 self.assertEqual(recorded, [])
658
659 def test_invalid_newline(self):
660 with warnings.catch_warnings(record=True) as recorded:
661 with self.assertRaises(ValueError):
662 self.open(support.TESTFN, 'w', newline='invalid')
663 support.gc_collect()
664 self.assertEqual(recorded, [])
665
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200668
669 def test_IOBase_finalize(self):
670 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
671 # class which inherits IOBase and an object of this class are caught
672 # in a reference cycle and close() is already in the method cache.
673 class MyIO(self.IOBase):
674 def close(self):
675 pass
676
677 # create an instance to populate the method cache
678 MyIO()
679 obj = MyIO()
680 obj.obj = obj
681 wr = weakref.ref(obj)
682 del MyIO
683 del obj
684 support.gc_collect()
685 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687class PyIOTest(IOTest):
688 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000689
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class CommonBufferedTests:
692 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
693
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000694 def test_detach(self):
695 raw = self.MockRawIO()
696 buf = self.tp(raw)
697 self.assertIs(buf.detach(), raw)
698 self.assertRaises(ValueError, buf.detach)
699
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600700 repr(buf) # Should still work
701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_fileno(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705
Ezio Melottib3aedd42010-11-20 19:04:17 +0000706 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707
Zachary Ware9fe6d862013-12-08 00:20:35 -0600708 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 def test_no_fileno(self):
710 # XXX will we always have fileno() function? If so, kill
711 # this test. Else, write it.
712 pass
713
714 def test_invalid_args(self):
715 rawio = self.MockRawIO()
716 bufio = self.tp(rawio)
717 # Invalid whence
718 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200719 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000720
721 def test_override_destructor(self):
722 tp = self.tp
723 record = []
724 class MyBufferedIO(tp):
725 def __del__(self):
726 record.append(1)
727 try:
728 f = super().__del__
729 except AttributeError:
730 pass
731 else:
732 f()
733 def close(self):
734 record.append(2)
735 super().close()
736 def flush(self):
737 record.append(3)
738 super().flush()
739 rawio = self.MockRawIO()
740 bufio = MyBufferedIO(rawio)
741 writable = bufio.writable()
742 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000743 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 if writable:
745 self.assertEqual(record, [1, 2, 3])
746 else:
747 self.assertEqual(record, [1, 2])
748
749 def test_context_manager(self):
750 # Test usability as a context manager
751 rawio = self.MockRawIO()
752 bufio = self.tp(rawio)
753 def _with():
754 with bufio:
755 pass
756 _with()
757 # bufio should now be closed, and using it a second time should raise
758 # a ValueError.
759 self.assertRaises(ValueError, _with)
760
761 def test_error_through_destructor(self):
762 # Test that the exception state is not modified by a destructor,
763 # even if close() fails.
764 rawio = self.CloseFailureIO()
765 def f():
766 self.tp(rawio).xyzzy
767 with support.captured_output("stderr") as s:
768 self.assertRaises(AttributeError, f)
769 s = s.getvalue().strip()
770 if s:
771 # The destructor *may* have printed an unraisable error, check it
772 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200773 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000774 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000775
Antoine Pitrou716c4442009-05-23 19:04:03 +0000776 def test_repr(self):
777 raw = self.MockRawIO()
778 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300779 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000780 self.assertEqual(repr(b), "<%s>" % clsname)
781 raw.name = "dummy"
782 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
783 raw.name = b"dummy"
784 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
785
Antoine Pitrou6be88762010-05-03 16:48:20 +0000786 def test_flush_error_on_close(self):
787 raw = self.MockRawIO()
788 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200789 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790 raw.flush = bad_flush
791 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 self.assertTrue(b.closed)
794
795 def test_close_error_on_close(self):
796 raw = self.MockRawIO()
797 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200798 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600799 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200800 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600801 raw.close = bad_close
802 b = self.tp(raw)
803 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200804 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600805 b.close()
806 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300807 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600808 self.assertEqual(err.exception.__context__.args, ('flush',))
809 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000810
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300811 def test_nonnormalized_close_error_on_close(self):
812 # Issue #21677
813 raw = self.MockRawIO()
814 def bad_flush():
815 raise non_existing_flush
816 def bad_close():
817 raise non_existing_close
818 raw.close = bad_close
819 b = self.tp(raw)
820 b.flush = bad_flush
821 with self.assertRaises(NameError) as err: # exception not swallowed
822 b.close()
823 self.assertIn('non_existing_close', str(err.exception))
824 self.assertIsInstance(err.exception.__context__, NameError)
825 self.assertIn('non_existing_flush', str(err.exception.__context__))
826 self.assertFalse(b.closed)
827
Antoine Pitrou6be88762010-05-03 16:48:20 +0000828 def test_multi_close(self):
829 raw = self.MockRawIO()
830 b = self.tp(raw)
831 b.close()
832 b.close()
833 b.close()
834 self.assertRaises(ValueError, b.flush)
835
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000836 def test_unseekable(self):
837 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
838 self.assertRaises(self.UnsupportedOperation, bufio.tell)
839 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
840
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000841 def test_readonly_attributes(self):
842 raw = self.MockRawIO()
843 buf = self.tp(raw)
844 x = self.MockRawIO()
845 with self.assertRaises(AttributeError):
846 buf.raw = x
847
Guido van Rossum78892e42007-04-06 17:31:18 +0000848
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200849class SizeofTest:
850
851 @support.cpython_only
852 def test_sizeof(self):
853 bufsize1 = 4096
854 bufsize2 = 8192
855 rawio = self.MockRawIO()
856 bufio = self.tp(rawio, buffer_size=bufsize1)
857 size = sys.getsizeof(bufio) - bufsize1
858 rawio = self.MockRawIO()
859 bufio = self.tp(rawio, buffer_size=bufsize2)
860 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
861
Jesus Ceadc469452012-10-04 12:37:56 +0200862 @support.cpython_only
863 def test_buffer_freeing(self) :
864 bufsize = 4096
865 rawio = self.MockRawIO()
866 bufio = self.tp(rawio, buffer_size=bufsize)
867 size = sys.getsizeof(bufio) - bufsize
868 bufio.close()
869 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000871class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
872 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000874 def test_constructor(self):
875 rawio = self.MockRawIO([b"abc"])
876 bufio = self.tp(rawio)
877 bufio.__init__(rawio)
878 bufio.__init__(rawio, buffer_size=1024)
879 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000880 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
882 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
884 rawio = self.MockRawIO([b"abc"])
885 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000886 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000887
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200888 def test_uninitialized(self):
889 bufio = self.tp.__new__(self.tp)
890 del bufio
891 bufio = self.tp.__new__(self.tp)
892 self.assertRaisesRegex((ValueError, AttributeError),
893 'uninitialized|has no attribute',
894 bufio.read, 0)
895 bufio.__init__(self.MockRawIO())
896 self.assertEqual(bufio.read(0), b'')
897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000899 for arg in (None, 7):
900 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
901 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000902 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 # Invalid args
904 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_read1(self):
907 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000909 self.assertEqual(b"a", bufio.read(1))
910 self.assertEqual(b"b", bufio.read1(1))
911 self.assertEqual(rawio._reads, 1)
912 self.assertEqual(b"c", bufio.read1(100))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"d", bufio.read1(100))
915 self.assertEqual(rawio._reads, 2)
916 self.assertEqual(b"efg", bufio.read1(100))
917 self.assertEqual(rawio._reads, 3)
918 self.assertEqual(b"", bufio.read1(100))
919 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000920 # Invalid args
921 self.assertRaises(ValueError, bufio.read1, -1)
922
923 def test_readinto(self):
924 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
925 bufio = self.tp(rawio)
926 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000927 self.assertEqual(bufio.readinto(b), 2)
928 self.assertEqual(b, b"ab")
929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"cd")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"ef")
933 self.assertEqual(bufio.readinto(b), 1)
934 self.assertEqual(b, b"gf")
935 self.assertEqual(bufio.readinto(b), 0)
936 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200937 rawio = self.MockRawIO((b"abc", None))
938 bufio = self.tp(rawio)
939 self.assertEqual(bufio.readinto(b), 2)
940 self.assertEqual(b, b"ab")
941 self.assertEqual(bufio.readinto(b), 1)
942 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943
Benjamin Petersona96fea02014-06-22 14:17:44 -0700944 def test_readinto1(self):
945 buffer_size = 10
946 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
947 bufio = self.tp(rawio, buffer_size=buffer_size)
948 b = bytearray(2)
949 self.assertEqual(bufio.peek(3), b'abc')
950 self.assertEqual(rawio._reads, 1)
951 self.assertEqual(bufio.readinto1(b), 2)
952 self.assertEqual(b, b"ab")
953 self.assertEqual(rawio._reads, 1)
954 self.assertEqual(bufio.readinto1(b), 1)
955 self.assertEqual(b[:1], b"c")
956 self.assertEqual(rawio._reads, 1)
957 self.assertEqual(bufio.readinto1(b), 2)
958 self.assertEqual(b, b"de")
959 self.assertEqual(rawio._reads, 2)
960 b = bytearray(2*buffer_size)
961 self.assertEqual(bufio.peek(3), b'fgh')
962 self.assertEqual(rawio._reads, 3)
963 self.assertEqual(bufio.readinto1(b), 6)
964 self.assertEqual(b[:6], b"fghjkl")
965 self.assertEqual(rawio._reads, 4)
966
967 def test_readinto_array(self):
968 buffer_size = 60
969 data = b"a" * 26
970 rawio = self.MockRawIO((data,))
971 bufio = self.tp(rawio, buffer_size=buffer_size)
972
973 # Create an array with element size > 1 byte
974 b = array.array('i', b'x' * 32)
975 assert len(b) != 16
976
977 # Read into it. We should get as many *bytes* as we can fit into b
978 # (which is more than the number of elements)
979 n = bufio.readinto(b)
980 self.assertGreater(n, len(b))
981
982 # Check that old contents of b are preserved
983 bm = memoryview(b).cast('B')
984 self.assertLess(n, len(bm))
985 self.assertEqual(bm[:n], data[:n])
986 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
987
988 def test_readinto1_array(self):
989 buffer_size = 60
990 data = b"a" * 26
991 rawio = self.MockRawIO((data,))
992 bufio = self.tp(rawio, buffer_size=buffer_size)
993
994 # Create an array with element size > 1 byte
995 b = array.array('i', b'x' * 32)
996 assert len(b) != 16
997
998 # Read into it. We should get as many *bytes* as we can fit into b
999 # (which is more than the number of elements)
1000 n = bufio.readinto1(b)
1001 self.assertGreater(n, len(b))
1002
1003 # Check that old contents of b are preserved
1004 bm = memoryview(b).cast('B')
1005 self.assertLess(n, len(bm))
1006 self.assertEqual(bm[:n], data[:n])
1007 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1008
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001009 def test_readlines(self):
1010 def bufio():
1011 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1012 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001013 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1014 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1015 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001016
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001018 data = b"abcdefghi"
1019 dlen = len(data)
1020
1021 tests = [
1022 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1023 [ 100, [ 3, 3, 3], [ dlen ] ],
1024 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1025 ]
1026
1027 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 rawio = self.MockFileIO(data)
1029 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001030 pos = 0
1031 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001032 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001033 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001038 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1040 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001041 self.assertEqual(b"abcd", bufio.read(6))
1042 self.assertEqual(b"e", bufio.read(1))
1043 self.assertEqual(b"fg", bufio.read())
1044 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001045 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001046 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001047
Victor Stinnera80987f2011-05-25 22:47:16 +02001048 rawio = self.MockRawIO((b"a", None, None))
1049 self.assertEqual(b"a", rawio.readall())
1050 self.assertIsNone(rawio.readall())
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_read_past_eof(self):
1053 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1054 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_read_all(self):
1059 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1060 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001063
Victor Stinner45df8202010-04-28 22:31:17 +00001064 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001065 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001067 try:
1068 # Write out many bytes with exactly the same number of 0's,
1069 # 1's... 255's. This will help us check that concurrent reading
1070 # doesn't duplicate or forget contents.
1071 N = 1000
1072 l = list(range(256)) * N
1073 random.shuffle(l)
1074 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001075 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001076 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001077 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001079 errors = []
1080 results = []
1081 def f():
1082 try:
1083 # Intra-buffer read then buffer-flushing read
1084 for n in cycle([1, 19]):
1085 s = bufio.read(n)
1086 if not s:
1087 break
1088 # list.append() is atomic
1089 results.append(s)
1090 except Exception as e:
1091 errors.append(e)
1092 raise
1093 threads = [threading.Thread(target=f) for x in range(20)]
1094 for t in threads:
1095 t.start()
1096 time.sleep(0.02) # yield
1097 for t in threads:
1098 t.join()
1099 self.assertFalse(errors,
1100 "the following exceptions were caught: %r" % errors)
1101 s = b''.join(results)
1102 for i in range(256):
1103 c = bytes(bytearray([i]))
1104 self.assertEqual(s.count(c), N)
1105 finally:
1106 support.unlink(support.TESTFN)
1107
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001108 def test_unseekable(self):
1109 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1110 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1111 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1112 bufio.read(1)
1113 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1114 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 def test_misbehaved_io(self):
1117 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1118 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001119 self.assertRaises(OSError, bufio.seek, 0)
1120 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001122 def test_no_extraneous_read(self):
1123 # Issue #9550; when the raw IO object has satisfied the read request,
1124 # we should not issue any additional reads, otherwise it may block
1125 # (e.g. socket).
1126 bufsize = 16
1127 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1128 rawio = self.MockRawIO([b"x" * n])
1129 bufio = self.tp(rawio, bufsize)
1130 self.assertEqual(bufio.read(n), b"x" * n)
1131 # Simple case: one raw read is enough to satisfy the request.
1132 self.assertEqual(rawio._extraneous_reads, 0,
1133 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1134 # A more complex case where two raw reads are needed to satisfy
1135 # the request.
1136 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1137 bufio = self.tp(rawio, bufsize)
1138 self.assertEqual(bufio.read(n), b"x" * n)
1139 self.assertEqual(rawio._extraneous_reads, 0,
1140 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1141
1142
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001143class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 tp = io.BufferedReader
1145
1146 def test_constructor(self):
1147 BufferedReaderTest.test_constructor(self)
1148 # The allocation can succeed on 32-bit builds, e.g. with more
1149 # than 2GB RAM and a 64-bit kernel.
1150 if sys.maxsize > 0x7FFFFFFF:
1151 rawio = self.MockRawIO()
1152 bufio = self.tp(rawio)
1153 self.assertRaises((OverflowError, MemoryError, ValueError),
1154 bufio.__init__, rawio, sys.maxsize)
1155
1156 def test_initialization(self):
1157 rawio = self.MockRawIO([b"abc"])
1158 bufio = self.tp(rawio)
1159 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1160 self.assertRaises(ValueError, bufio.read)
1161 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1162 self.assertRaises(ValueError, bufio.read)
1163 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1164 self.assertRaises(ValueError, bufio.read)
1165
1166 def test_misbehaved_io_read(self):
1167 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1168 bufio = self.tp(rawio)
1169 # _pyio.BufferedReader seems to implement reading different, so that
1170 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001171 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172
1173 def test_garbage_collection(self):
1174 # C BufferedReader objects are collected.
1175 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001176 with support.check_warnings(('', ResourceWarning)):
1177 rawio = self.FileIO(support.TESTFN, "w+b")
1178 f = self.tp(rawio)
1179 f.f = f
1180 wr = weakref.ref(f)
1181 del f
1182 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001183 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184
R David Murray67bfe802013-02-23 21:51:05 -05001185 def test_args_error(self):
1186 # Issue #17275
1187 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1188 self.tp(io.BytesIO(), 1024, 1024, 1024)
1189
1190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001191class PyBufferedReaderTest(BufferedReaderTest):
1192 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001193
Guido van Rossuma9e20242007-03-08 00:43:48 +00001194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1196 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 def test_constructor(self):
1199 rawio = self.MockRawIO()
1200 bufio = self.tp(rawio)
1201 bufio.__init__(rawio)
1202 bufio.__init__(rawio, buffer_size=1024)
1203 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001204 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001205 bufio.flush()
1206 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1207 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1209 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001214 def test_uninitialized(self):
1215 bufio = self.tp.__new__(self.tp)
1216 del bufio
1217 bufio = self.tp.__new__(self.tp)
1218 self.assertRaisesRegex((ValueError, AttributeError),
1219 'uninitialized|has no attribute',
1220 bufio.write, b'')
1221 bufio.__init__(self.MockRawIO())
1222 self.assertEqual(bufio.write(b''), 0)
1223
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001224 def test_detach_flush(self):
1225 raw = self.MockRawIO()
1226 buf = self.tp(raw)
1227 buf.write(b"howdy!")
1228 self.assertFalse(raw._write_stack)
1229 buf.detach()
1230 self.assertEqual(raw._write_stack, [b"howdy!"])
1231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001233 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001234 writer = self.MockRawIO()
1235 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001236 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001237 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_write_overflow(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 contents = b"abcdefghijklmnop"
1243 for n in range(0, len(contents), 3):
1244 bufio.write(contents[n:n+3])
1245 flushed = b"".join(writer._write_stack)
1246 # At least (total - 8) bytes were implicitly flushed, perhaps more
1247 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001248 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def check_writes(self, intermediate_func):
1251 # Lots of writes, test the flushed output is as expected.
1252 contents = bytes(range(256)) * 1000
1253 n = 0
1254 writer = self.MockRawIO()
1255 bufio = self.tp(writer, 13)
1256 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1257 def gen_sizes():
1258 for size in count(1):
1259 for i in range(15):
1260 yield size
1261 sizes = gen_sizes()
1262 while n < len(contents):
1263 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 intermediate_func(bufio)
1266 n += size
1267 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001268 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def test_writes(self):
1271 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001272
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 def test_writes_and_flushes(self):
1274 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001275
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 def test_writes_and_seeks(self):
1277 def _seekabs(bufio):
1278 pos = bufio.tell()
1279 bufio.seek(pos + 1, 0)
1280 bufio.seek(pos - 1, 0)
1281 bufio.seek(pos, 0)
1282 self.check_writes(_seekabs)
1283 def _seekrel(bufio):
1284 pos = bufio.seek(0, 1)
1285 bufio.seek(+1, 1)
1286 bufio.seek(-1, 1)
1287 bufio.seek(pos, 0)
1288 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 def test_writes_and_truncates(self):
1291 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 def test_write_non_blocking(self):
1294 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001295 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001296
Ezio Melottib3aedd42010-11-20 19:04:17 +00001297 self.assertEqual(bufio.write(b"abcd"), 4)
1298 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 # 1 byte will be written, the rest will be buffered
1300 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001301 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001302
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001303 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1304 raw.block_on(b"0")
1305 try:
1306 bufio.write(b"opqrwxyz0123456789")
1307 except self.BlockingIOError as e:
1308 written = e.characters_written
1309 else:
1310 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001311 self.assertEqual(written, 16)
1312 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001314
Ezio Melottib3aedd42010-11-20 19:04:17 +00001315 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 s = raw.pop_written()
1317 # Previously buffered bytes were flushed
1318 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def test_write_and_rewind(self):
1321 raw = io.BytesIO()
1322 bufio = self.tp(raw, 4)
1323 self.assertEqual(bufio.write(b"abcdef"), 6)
1324 self.assertEqual(bufio.tell(), 6)
1325 bufio.seek(0, 0)
1326 self.assertEqual(bufio.write(b"XY"), 2)
1327 bufio.seek(6, 0)
1328 self.assertEqual(raw.getvalue(), b"XYcdef")
1329 self.assertEqual(bufio.write(b"123456"), 6)
1330 bufio.flush()
1331 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333 def test_flush(self):
1334 writer = self.MockRawIO()
1335 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001336 bufio.write(b"abc")
1337 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001339
Antoine Pitrou131a4892012-10-16 22:57:11 +02001340 def test_writelines(self):
1341 l = [b'ab', b'cd', b'ef']
1342 writer = self.MockRawIO()
1343 bufio = self.tp(writer, 8)
1344 bufio.writelines(l)
1345 bufio.flush()
1346 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1347
1348 def test_writelines_userlist(self):
1349 l = UserList([b'ab', b'cd', b'ef'])
1350 writer = self.MockRawIO()
1351 bufio = self.tp(writer, 8)
1352 bufio.writelines(l)
1353 bufio.flush()
1354 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1355
1356 def test_writelines_error(self):
1357 writer = self.MockRawIO()
1358 bufio = self.tp(writer, 8)
1359 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1360 self.assertRaises(TypeError, bufio.writelines, None)
1361 self.assertRaises(TypeError, bufio.writelines, 'abc')
1362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 def test_destructor(self):
1364 writer = self.MockRawIO()
1365 bufio = self.tp(writer, 8)
1366 bufio.write(b"abc")
1367 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001368 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001369 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370
1371 def test_truncate(self):
1372 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001373 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001374 bufio = self.tp(raw, 8)
1375 bufio.write(b"abcdef")
1376 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001377 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001378 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 self.assertEqual(f.read(), b"abc")
1380
Victor Stinner45df8202010-04-28 22:31:17 +00001381 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001382 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001384 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 # Write out many bytes from many threads and test they were
1386 # all flushed.
1387 N = 1000
1388 contents = bytes(range(256)) * N
1389 sizes = cycle([1, 19])
1390 n = 0
1391 queue = deque()
1392 while n < len(contents):
1393 size = next(sizes)
1394 queue.append(contents[n:n+size])
1395 n += size
1396 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001397 # We use a real file object because it allows us to
1398 # exercise situations where the GIL is released before
1399 # writing the buffer to the raw streams. This is in addition
1400 # to concurrency issues due to switching threads in the middle
1401 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001402 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001404 errors = []
1405 def f():
1406 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 while True:
1408 try:
1409 s = queue.popleft()
1410 except IndexError:
1411 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001412 bufio.write(s)
1413 except Exception as e:
1414 errors.append(e)
1415 raise
1416 threads = [threading.Thread(target=f) for x in range(20)]
1417 for t in threads:
1418 t.start()
1419 time.sleep(0.02) # yield
1420 for t in threads:
1421 t.join()
1422 self.assertFalse(errors,
1423 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001424 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001425 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 s = f.read()
1427 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001429 finally:
1430 support.unlink(support.TESTFN)
1431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 def test_misbehaved_io(self):
1433 rawio = self.MisbehavedRawIO()
1434 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001435 self.assertRaises(OSError, bufio.seek, 0)
1436 self.assertRaises(OSError, bufio.tell)
1437 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438
Florent Xicluna109d5732012-07-07 17:03:22 +02001439 def test_max_buffer_size_removal(self):
1440 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001441 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001442
Benjamin Peterson68623612012-12-20 11:53:11 -06001443 def test_write_error_on_close(self):
1444 raw = self.MockRawIO()
1445 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001446 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001447 raw.write = bad_write
1448 b = self.tp(raw)
1449 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001450 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001451 self.assertTrue(b.closed)
1452
Benjamin Peterson59406a92009-03-26 17:10:29 +00001453
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001454class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455 tp = io.BufferedWriter
1456
1457 def test_constructor(self):
1458 BufferedWriterTest.test_constructor(self)
1459 # The allocation can succeed on 32-bit builds, e.g. with more
1460 # than 2GB RAM and a 64-bit kernel.
1461 if sys.maxsize > 0x7FFFFFFF:
1462 rawio = self.MockRawIO()
1463 bufio = self.tp(rawio)
1464 self.assertRaises((OverflowError, MemoryError, ValueError),
1465 bufio.__init__, rawio, sys.maxsize)
1466
1467 def test_initialization(self):
1468 rawio = self.MockRawIO()
1469 bufio = self.tp(rawio)
1470 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1471 self.assertRaises(ValueError, bufio.write, b"def")
1472 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1473 self.assertRaises(ValueError, bufio.write, b"def")
1474 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1475 self.assertRaises(ValueError, bufio.write, b"def")
1476
1477 def test_garbage_collection(self):
1478 # C BufferedWriter objects are collected, and collecting them flushes
1479 # all data to disk.
1480 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001481 with support.check_warnings(('', ResourceWarning)):
1482 rawio = self.FileIO(support.TESTFN, "w+b")
1483 f = self.tp(rawio)
1484 f.write(b"123xxx")
1485 f.x = f
1486 wr = weakref.ref(f)
1487 del f
1488 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001489 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001490 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001491 self.assertEqual(f.read(), b"123xxx")
1492
R David Murray67bfe802013-02-23 21:51:05 -05001493 def test_args_error(self):
1494 # Issue #17275
1495 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1496 self.tp(io.BytesIO(), 1024, 1024, 1024)
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498
1499class PyBufferedWriterTest(BufferedWriterTest):
1500 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001501
Guido van Rossum01a27522007-03-07 01:00:12 +00001502class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001503
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001504 def test_constructor(self):
1505 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001506 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001507
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001508 def test_uninitialized(self):
1509 pair = self.tp.__new__(self.tp)
1510 del pair
1511 pair = self.tp.__new__(self.tp)
1512 self.assertRaisesRegex((ValueError, AttributeError),
1513 'uninitialized|has no attribute',
1514 pair.read, 0)
1515 self.assertRaisesRegex((ValueError, AttributeError),
1516 'uninitialized|has no attribute',
1517 pair.write, b'')
1518 pair.__init__(self.MockRawIO(), self.MockRawIO())
1519 self.assertEqual(pair.read(0), b'')
1520 self.assertEqual(pair.write(b''), 0)
1521
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001522 def test_detach(self):
1523 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1524 self.assertRaises(self.UnsupportedOperation, pair.detach)
1525
Florent Xicluna109d5732012-07-07 17:03:22 +02001526 def test_constructor_max_buffer_size_removal(self):
1527 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001528 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001529
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001530 def test_constructor_with_not_readable(self):
1531 class NotReadable(MockRawIO):
1532 def readable(self):
1533 return False
1534
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001535 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001536
1537 def test_constructor_with_not_writeable(self):
1538 class NotWriteable(MockRawIO):
1539 def writable(self):
1540 return False
1541
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001542 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001543
1544 def test_read(self):
1545 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1546
1547 self.assertEqual(pair.read(3), b"abc")
1548 self.assertEqual(pair.read(1), b"d")
1549 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001550 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1551 self.assertEqual(pair.read(None), b"abc")
1552
1553 def test_readlines(self):
1554 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1555 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1556 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1557 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001558
1559 def test_read1(self):
1560 # .read1() is delegated to the underlying reader object, so this test
1561 # can be shallow.
1562 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1563
1564 self.assertEqual(pair.read1(3), b"abc")
1565
1566 def test_readinto(self):
1567 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1568
1569 data = bytearray(5)
1570 self.assertEqual(pair.readinto(data), 5)
1571 self.assertEqual(data, b"abcde")
1572
1573 def test_write(self):
1574 w = self.MockRawIO()
1575 pair = self.tp(self.MockRawIO(), w)
1576
1577 pair.write(b"abc")
1578 pair.flush()
1579 pair.write(b"def")
1580 pair.flush()
1581 self.assertEqual(w._write_stack, [b"abc", b"def"])
1582
1583 def test_peek(self):
1584 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1585
1586 self.assertTrue(pair.peek(3).startswith(b"abc"))
1587 self.assertEqual(pair.read(3), b"abc")
1588
1589 def test_readable(self):
1590 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1591 self.assertTrue(pair.readable())
1592
1593 def test_writeable(self):
1594 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1595 self.assertTrue(pair.writable())
1596
1597 def test_seekable(self):
1598 # BufferedRWPairs are never seekable, even if their readers and writers
1599 # are.
1600 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1601 self.assertFalse(pair.seekable())
1602
1603 # .flush() is delegated to the underlying writer object and has been
1604 # tested in the test_write method.
1605
1606 def test_close_and_closed(self):
1607 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1608 self.assertFalse(pair.closed)
1609 pair.close()
1610 self.assertTrue(pair.closed)
1611
1612 def test_isatty(self):
1613 class SelectableIsAtty(MockRawIO):
1614 def __init__(self, isatty):
1615 MockRawIO.__init__(self)
1616 self._isatty = isatty
1617
1618 def isatty(self):
1619 return self._isatty
1620
1621 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1622 self.assertFalse(pair.isatty())
1623
1624 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1625 self.assertTrue(pair.isatty())
1626
1627 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1628 self.assertTrue(pair.isatty())
1629
1630 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1631 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001632
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001633 def test_weakref_clearing(self):
1634 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1635 ref = weakref.ref(brw)
1636 brw = None
1637 ref = None # Shouldn't segfault.
1638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639class CBufferedRWPairTest(BufferedRWPairTest):
1640 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642class PyBufferedRWPairTest(BufferedRWPairTest):
1643 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001644
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001645
1646class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1647 read_mode = "rb+"
1648 write_mode = "wb+"
1649
1650 def test_constructor(self):
1651 BufferedReaderTest.test_constructor(self)
1652 BufferedWriterTest.test_constructor(self)
1653
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001654 def test_uninitialized(self):
1655 BufferedReaderTest.test_uninitialized(self)
1656 BufferedWriterTest.test_uninitialized(self)
1657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 def test_read_and_write(self):
1659 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001660 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001661
1662 self.assertEqual(b"as", rw.read(2))
1663 rw.write(b"ddd")
1664 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001665 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001667 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001668
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 def test_seek_and_tell(self):
1670 raw = self.BytesIO(b"asdfghjkl")
1671 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001672
Ezio Melottib3aedd42010-11-20 19:04:17 +00001673 self.assertEqual(b"as", rw.read(2))
1674 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001675 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001676 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001677
Antoine Pitroue05565e2011-08-20 14:39:23 +02001678 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001679 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001680 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001681 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001682 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001683 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001684 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001685 self.assertEqual(7, rw.tell())
1686 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001687 rw.flush()
1688 self.assertEqual(b"asdf123fl", raw.getvalue())
1689
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001690 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 def check_flush_and_read(self, read_func):
1693 raw = self.BytesIO(b"abcdefghi")
1694 bufio = self.tp(raw)
1695
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001698 self.assertEqual(b"ef", read_func(bufio, 2))
1699 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(6, bufio.tell())
1702 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 raw.seek(0, 0)
1704 raw.write(b"XYZ")
1705 # flush() resets the read buffer
1706 bufio.flush()
1707 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001708 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709
1710 def test_flush_and_read(self):
1711 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1712
1713 def test_flush_and_readinto(self):
1714 def _readinto(bufio, n=-1):
1715 b = bytearray(n if n >= 0 else 9999)
1716 n = bufio.readinto(b)
1717 return bytes(b[:n])
1718 self.check_flush_and_read(_readinto)
1719
1720 def test_flush_and_peek(self):
1721 def _peek(bufio, n=-1):
1722 # This relies on the fact that the buffer can contain the whole
1723 # raw stream, otherwise peek() can return less.
1724 b = bufio.peek(n)
1725 if n != -1:
1726 b = b[:n]
1727 bufio.seek(len(b), 1)
1728 return b
1729 self.check_flush_and_read(_peek)
1730
1731 def test_flush_and_write(self):
1732 raw = self.BytesIO(b"abcdefghi")
1733 bufio = self.tp(raw)
1734
1735 bufio.write(b"123")
1736 bufio.flush()
1737 bufio.write(b"45")
1738 bufio.flush()
1739 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001740 self.assertEqual(b"12345fghi", raw.getvalue())
1741 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742
1743 def test_threads(self):
1744 BufferedReaderTest.test_threads(self)
1745 BufferedWriterTest.test_threads(self)
1746
1747 def test_writes_and_peek(self):
1748 def _peek(bufio):
1749 bufio.peek(1)
1750 self.check_writes(_peek)
1751 def _peek(bufio):
1752 pos = bufio.tell()
1753 bufio.seek(-1, 1)
1754 bufio.peek(1)
1755 bufio.seek(pos, 0)
1756 self.check_writes(_peek)
1757
1758 def test_writes_and_reads(self):
1759 def _read(bufio):
1760 bufio.seek(-1, 1)
1761 bufio.read(1)
1762 self.check_writes(_read)
1763
1764 def test_writes_and_read1s(self):
1765 def _read1(bufio):
1766 bufio.seek(-1, 1)
1767 bufio.read1(1)
1768 self.check_writes(_read1)
1769
1770 def test_writes_and_readintos(self):
1771 def _read(bufio):
1772 bufio.seek(-1, 1)
1773 bufio.readinto(bytearray(1))
1774 self.check_writes(_read)
1775
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001776 def test_write_after_readahead(self):
1777 # Issue #6629: writing after the buffer was filled by readahead should
1778 # first rewind the raw stream.
1779 for overwrite_size in [1, 5]:
1780 raw = self.BytesIO(b"A" * 10)
1781 bufio = self.tp(raw, 4)
1782 # Trigger readahead
1783 self.assertEqual(bufio.read(1), b"A")
1784 self.assertEqual(bufio.tell(), 1)
1785 # Overwriting should rewind the raw stream if it needs so
1786 bufio.write(b"B" * overwrite_size)
1787 self.assertEqual(bufio.tell(), overwrite_size + 1)
1788 # If the write size was smaller than the buffer size, flush() and
1789 # check that rewind happens.
1790 bufio.flush()
1791 self.assertEqual(bufio.tell(), overwrite_size + 1)
1792 s = raw.getvalue()
1793 self.assertEqual(s,
1794 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1795
Antoine Pitrou7c404892011-05-13 00:13:33 +02001796 def test_write_rewind_write(self):
1797 # Various combinations of reading / writing / seeking backwards / writing again
1798 def mutate(bufio, pos1, pos2):
1799 assert pos2 >= pos1
1800 # Fill the buffer
1801 bufio.seek(pos1)
1802 bufio.read(pos2 - pos1)
1803 bufio.write(b'\x02')
1804 # This writes earlier than the previous write, but still inside
1805 # the buffer.
1806 bufio.seek(pos1)
1807 bufio.write(b'\x01')
1808
1809 b = b"\x80\x81\x82\x83\x84"
1810 for i in range(0, len(b)):
1811 for j in range(i, len(b)):
1812 raw = self.BytesIO(b)
1813 bufio = self.tp(raw, 100)
1814 mutate(bufio, i, j)
1815 bufio.flush()
1816 expected = bytearray(b)
1817 expected[j] = 2
1818 expected[i] = 1
1819 self.assertEqual(raw.getvalue(), expected,
1820 "failed result for i=%d, j=%d" % (i, j))
1821
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001822 def test_truncate_after_read_or_write(self):
1823 raw = self.BytesIO(b"A" * 10)
1824 bufio = self.tp(raw, 100)
1825 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1826 self.assertEqual(bufio.truncate(), 2)
1827 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1828 self.assertEqual(bufio.truncate(), 4)
1829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_misbehaved_io(self):
1831 BufferedReaderTest.test_misbehaved_io(self)
1832 BufferedWriterTest.test_misbehaved_io(self)
1833
Antoine Pitroue05565e2011-08-20 14:39:23 +02001834 def test_interleaved_read_write(self):
1835 # Test for issue #12213
1836 with self.BytesIO(b'abcdefgh') as raw:
1837 with self.tp(raw, 100) as f:
1838 f.write(b"1")
1839 self.assertEqual(f.read(1), b'b')
1840 f.write(b'2')
1841 self.assertEqual(f.read1(1), b'd')
1842 f.write(b'3')
1843 buf = bytearray(1)
1844 f.readinto(buf)
1845 self.assertEqual(buf, b'f')
1846 f.write(b'4')
1847 self.assertEqual(f.peek(1), b'h')
1848 f.flush()
1849 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1850
1851 with self.BytesIO(b'abc') as raw:
1852 with self.tp(raw, 100) as f:
1853 self.assertEqual(f.read(1), b'a')
1854 f.write(b"2")
1855 self.assertEqual(f.read(1), b'c')
1856 f.flush()
1857 self.assertEqual(raw.getvalue(), b'a2c')
1858
1859 def test_interleaved_readline_write(self):
1860 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1861 with self.tp(raw) as f:
1862 f.write(b'1')
1863 self.assertEqual(f.readline(), b'b\n')
1864 f.write(b'2')
1865 self.assertEqual(f.readline(), b'def\n')
1866 f.write(b'3')
1867 self.assertEqual(f.readline(), b'\n')
1868 f.flush()
1869 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1870
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001871 # You can't construct a BufferedRandom over a non-seekable stream.
1872 test_unseekable = None
1873
R David Murray67bfe802013-02-23 21:51:05 -05001874
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001875class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 tp = io.BufferedRandom
1877
1878 def test_constructor(self):
1879 BufferedRandomTest.test_constructor(self)
1880 # The allocation can succeed on 32-bit builds, e.g. with more
1881 # than 2GB RAM and a 64-bit kernel.
1882 if sys.maxsize > 0x7FFFFFFF:
1883 rawio = self.MockRawIO()
1884 bufio = self.tp(rawio)
1885 self.assertRaises((OverflowError, MemoryError, ValueError),
1886 bufio.__init__, rawio, sys.maxsize)
1887
1888 def test_garbage_collection(self):
1889 CBufferedReaderTest.test_garbage_collection(self)
1890 CBufferedWriterTest.test_garbage_collection(self)
1891
R David Murray67bfe802013-02-23 21:51:05 -05001892 def test_args_error(self):
1893 # Issue #17275
1894 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1895 self.tp(io.BytesIO(), 1024, 1024, 1024)
1896
1897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898class PyBufferedRandomTest(BufferedRandomTest):
1899 tp = pyio.BufferedRandom
1900
1901
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1903# properties:
1904# - A single output character can correspond to many bytes of input.
1905# - The number of input bytes to complete the character can be
1906# undetermined until the last input byte is received.
1907# - The number of input bytes can vary depending on previous input.
1908# - A single input byte can correspond to many characters of output.
1909# - The number of output characters can be undetermined until the
1910# last input byte is received.
1911# - The number of output characters can vary depending on previous input.
1912
1913class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1914 """
1915 For testing seek/tell behavior with a stateful, buffering decoder.
1916
1917 Input is a sequence of words. Words may be fixed-length (length set
1918 by input) or variable-length (period-terminated). In variable-length
1919 mode, extra periods are ignored. Possible words are:
1920 - 'i' followed by a number sets the input length, I (maximum 99).
1921 When I is set to 0, words are space-terminated.
1922 - 'o' followed by a number sets the output length, O (maximum 99).
1923 - Any other word is converted into a word followed by a period on
1924 the output. The output word consists of the input word truncated
1925 or padded out with hyphens to make its length equal to O. If O
1926 is 0, the word is output verbatim without truncating or padding.
1927 I and O are initially set to 1. When I changes, any buffered input is
1928 re-scanned according to the new I. EOF also terminates the last word.
1929 """
1930
1931 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001932 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001933 self.reset()
1934
1935 def __repr__(self):
1936 return '<SID %x>' % id(self)
1937
1938 def reset(self):
1939 self.i = 1
1940 self.o = 1
1941 self.buffer = bytearray()
1942
1943 def getstate(self):
1944 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1945 return bytes(self.buffer), i*100 + o
1946
1947 def setstate(self, state):
1948 buffer, io = state
1949 self.buffer = bytearray(buffer)
1950 i, o = divmod(io, 100)
1951 self.i, self.o = i ^ 1, o ^ 1
1952
1953 def decode(self, input, final=False):
1954 output = ''
1955 for b in input:
1956 if self.i == 0: # variable-length, terminated with period
1957 if b == ord('.'):
1958 if self.buffer:
1959 output += self.process_word()
1960 else:
1961 self.buffer.append(b)
1962 else: # fixed-length, terminate after self.i bytes
1963 self.buffer.append(b)
1964 if len(self.buffer) == self.i:
1965 output += self.process_word()
1966 if final and self.buffer: # EOF terminates the last word
1967 output += self.process_word()
1968 return output
1969
1970 def process_word(self):
1971 output = ''
1972 if self.buffer[0] == ord('i'):
1973 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1974 elif self.buffer[0] == ord('o'):
1975 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1976 else:
1977 output = self.buffer.decode('ascii')
1978 if len(output) < self.o:
1979 output += '-'*self.o # pad out with hyphens
1980 if self.o:
1981 output = output[:self.o] # truncate to output length
1982 output += '.'
1983 self.buffer = bytearray()
1984 return output
1985
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001986 codecEnabled = False
1987
1988 @classmethod
1989 def lookupTestDecoder(cls, name):
1990 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001991 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001992 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001993 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001994 incrementalencoder=None,
1995 streamreader=None, streamwriter=None,
1996 incrementaldecoder=cls)
1997
1998# Register the previous decoder for testing.
1999# Disabled by default, tests will enable it.
2000codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2001
2002
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002003class StatefulIncrementalDecoderTest(unittest.TestCase):
2004 """
2005 Make sure the StatefulIncrementalDecoder actually works.
2006 """
2007
2008 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002009 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002011 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002013 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002015 # I=0, O=6 (variable-length input, fixed-length output)
2016 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2017 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002018 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002019 # I=6, O=3 (fixed-length input > fixed-length output)
2020 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2021 # I=0, then 3; O=29, then 15 (with longer output)
2022 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2023 'a----------------------------.' +
2024 'b----------------------------.' +
2025 'cde--------------------------.' +
2026 'abcdefghijabcde.' +
2027 'a.b------------.' +
2028 '.c.------------.' +
2029 'd.e------------.' +
2030 'k--------------.' +
2031 'l--------------.' +
2032 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002033 ]
2034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002036 # Try a few one-shot test cases.
2037 for input, eof, output in self.test_cases:
2038 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002039 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002040
2041 # Also test an unfinished decode, followed by forcing EOF.
2042 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002043 self.assertEqual(d.decode(b'oiabcd'), '')
2044 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002045
2046class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002047
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002048 def setUp(self):
2049 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2050 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002051 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002052
Guido van Rossumd0712812007-04-11 16:32:43 +00002053 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002054 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002056 def test_constructor(self):
2057 r = self.BytesIO(b"\xc3\xa9\n\n")
2058 b = self.BufferedReader(r, 1000)
2059 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002060 t.__init__(b, encoding="latin-1", newline="\r\n")
2061 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002062 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002063 t.__init__(b, encoding="utf-8", line_buffering=True)
2064 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002065 self.assertEqual(t.line_buffering, True)
2066 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 self.assertRaises(TypeError, t.__init__, b, newline=42)
2068 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2069
Nick Coghlana9b15242014-02-04 22:11:18 +10002070 def test_non_text_encoding_codecs_are_rejected(self):
2071 # Ensure the constructor complains if passed a codec that isn't
2072 # marked as a text encoding
2073 # http://bugs.python.org/issue20404
2074 r = self.BytesIO()
2075 b = self.BufferedWriter(r)
2076 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2077 self.TextIOWrapper(b, encoding="hex")
2078
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002079 def test_detach(self):
2080 r = self.BytesIO()
2081 b = self.BufferedWriter(r)
2082 t = self.TextIOWrapper(b)
2083 self.assertIs(t.detach(), b)
2084
2085 t = self.TextIOWrapper(b, encoding="ascii")
2086 t.write("howdy")
2087 self.assertFalse(r.getvalue())
2088 t.detach()
2089 self.assertEqual(r.getvalue(), b"howdy")
2090 self.assertRaises(ValueError, t.detach)
2091
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002092 # Operations independent of the detached stream should still work
2093 repr(t)
2094 self.assertEqual(t.encoding, "ascii")
2095 self.assertEqual(t.errors, "strict")
2096 self.assertFalse(t.line_buffering)
2097
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002098 def test_repr(self):
2099 raw = self.BytesIO("hello".encode("utf-8"))
2100 b = self.BufferedReader(raw)
2101 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002102 modname = self.TextIOWrapper.__module__
2103 self.assertEqual(repr(t),
2104 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2105 raw.name = "dummy"
2106 self.assertEqual(repr(t),
2107 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002108 t.mode = "r"
2109 self.assertEqual(repr(t),
2110 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002111 raw.name = b"dummy"
2112 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002113 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002114
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002115 t.buffer.detach()
2116 repr(t) # Should not raise an exception
2117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 def test_line_buffering(self):
2119 r = self.BytesIO()
2120 b = self.BufferedWriter(r, 1000)
2121 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002122 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002124 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002125 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002126 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002128
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002129 def test_default_encoding(self):
2130 old_environ = dict(os.environ)
2131 try:
2132 # try to get a user preferred encoding different than the current
2133 # locale encoding to check that TextIOWrapper() uses the current
2134 # locale encoding and not the user preferred encoding
2135 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2136 if key in os.environ:
2137 del os.environ[key]
2138
2139 current_locale_encoding = locale.getpreferredencoding(False)
2140 b = self.BytesIO()
2141 t = self.TextIOWrapper(b)
2142 self.assertEqual(t.encoding, current_locale_encoding)
2143 finally:
2144 os.environ.clear()
2145 os.environ.update(old_environ)
2146
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002147 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002148 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002149 # Issue 15989
2150 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002151 b = self.BytesIO()
2152 b.fileno = lambda: _testcapi.INT_MAX + 1
2153 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2154 b.fileno = lambda: _testcapi.UINT_MAX + 1
2155 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 def test_encoding(self):
2158 # Check the encoding attribute is always set, and valid
2159 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002160 t = self.TextIOWrapper(b, encoding="utf-8")
2161 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002163 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 codecs.lookup(t.encoding)
2165
2166 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002167 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002168 b = self.BytesIO(b"abc\n\xff\n")
2169 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002170 self.assertRaises(UnicodeError, t.read)
2171 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 b = self.BytesIO(b"abc\n\xff\n")
2173 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002174 self.assertRaises(UnicodeError, t.read)
2175 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 b = self.BytesIO(b"abc\n\xff\n")
2177 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002178 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002179 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180 b = self.BytesIO(b"abc\n\xff\n")
2181 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002185 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 b = self.BytesIO()
2187 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002188 self.assertRaises(UnicodeError, t.write, "\xff")
2189 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 b = self.BytesIO()
2191 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002192 self.assertRaises(UnicodeError, t.write, "\xff")
2193 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 b = self.BytesIO()
2195 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002196 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002197 t.write("abc\xffdef\n")
2198 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002199 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002200 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 b = self.BytesIO()
2202 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002203 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002204 t.write("abc\xffdef\n")
2205 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002209 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2210
2211 tests = [
2212 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002213 [ '', input_lines ],
2214 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2215 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2216 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002217 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002218 encodings = (
2219 'utf-8', 'latin-1',
2220 'utf-16', 'utf-16-le', 'utf-16-be',
2221 'utf-32', 'utf-32-le', 'utf-32-be',
2222 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002223
Guido van Rossum8358db22007-08-18 21:39:55 +00002224 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002225 # character in TextIOWrapper._pending_line.
2226 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002227 # XXX: str.encode() should return bytes
2228 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002229 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002230 for bufsize in range(1, 10):
2231 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2233 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002234 encoding=encoding)
2235 if do_reads:
2236 got_lines = []
2237 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002238 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002239 if c2 == '':
2240 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002242 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002243 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002244 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002245
2246 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(got_line, exp_line)
2248 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002250 def test_newlines_input(self):
2251 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002252 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2253 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002254 (None, normalized.decode("ascii").splitlines(keepends=True)),
2255 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002256 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2257 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2258 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002259 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 buf = self.BytesIO(testdata)
2261 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002262 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002263 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002265
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266 def test_newlines_output(self):
2267 testdict = {
2268 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2269 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2270 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2271 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2272 }
2273 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2274 for newline, expected in tests:
2275 buf = self.BytesIO()
2276 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2277 txt.write("AAA\nB")
2278 txt.write("BB\nCCC\n")
2279 txt.write("X\rY\r\nZ")
2280 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002281 self.assertEqual(buf.closed, False)
2282 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002283
2284 def test_destructor(self):
2285 l = []
2286 base = self.BytesIO
2287 class MyBytesIO(base):
2288 def close(self):
2289 l.append(self.getvalue())
2290 base.close(self)
2291 b = MyBytesIO()
2292 t = self.TextIOWrapper(b, encoding="ascii")
2293 t.write("abc")
2294 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002295 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002296 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297
2298 def test_override_destructor(self):
2299 record = []
2300 class MyTextIO(self.TextIOWrapper):
2301 def __del__(self):
2302 record.append(1)
2303 try:
2304 f = super().__del__
2305 except AttributeError:
2306 pass
2307 else:
2308 f()
2309 def close(self):
2310 record.append(2)
2311 super().close()
2312 def flush(self):
2313 record.append(3)
2314 super().flush()
2315 b = self.BytesIO()
2316 t = MyTextIO(b, encoding="ascii")
2317 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002318 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 self.assertEqual(record, [1, 2, 3])
2320
2321 def test_error_through_destructor(self):
2322 # Test that the exception state is not modified by a destructor,
2323 # even if close() fails.
2324 rawio = self.CloseFailureIO()
2325 def f():
2326 self.TextIOWrapper(rawio).xyzzy
2327 with support.captured_output("stderr") as s:
2328 self.assertRaises(AttributeError, f)
2329 s = s.getvalue().strip()
2330 if s:
2331 # The destructor *may* have printed an unraisable error, check it
2332 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002333 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002334 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002335
Guido van Rossum9b76da62007-04-11 01:09:03 +00002336 # Systematic tests of the text I/O API
2337
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002339 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002340 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002341 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002342 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002344 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002346 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002347 self.assertEqual(f.tell(), 0)
2348 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002349 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(f.seek(0), 0)
2351 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002352 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002353 self.assertEqual(f.read(2), "ab")
2354 self.assertEqual(f.read(1), "c")
2355 self.assertEqual(f.read(1), "")
2356 self.assertEqual(f.read(), "")
2357 self.assertEqual(f.tell(), cookie)
2358 self.assertEqual(f.seek(0), 0)
2359 self.assertEqual(f.seek(0, 2), cookie)
2360 self.assertEqual(f.write("def"), 3)
2361 self.assertEqual(f.seek(cookie), cookie)
2362 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002363 if enc.startswith("utf"):
2364 self.multi_line_test(f, enc)
2365 f.close()
2366
2367 def multi_line_test(self, f, enc):
2368 f.seek(0)
2369 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002370 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002371 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002372 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002373 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002374 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002375 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002376 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002377 wlines.append((f.tell(), line))
2378 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002379 f.seek(0)
2380 rlines = []
2381 while True:
2382 pos = f.tell()
2383 line = f.readline()
2384 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002385 break
2386 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002387 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002390 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002391 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002392 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002393 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002394 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002395 p2 = f.tell()
2396 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(f.tell(), p0)
2398 self.assertEqual(f.readline(), "\xff\n")
2399 self.assertEqual(f.tell(), p1)
2400 self.assertEqual(f.readline(), "\xff\n")
2401 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002402 f.seek(0)
2403 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002404 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002405 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002406 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002407 f.close()
2408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 def test_seeking(self):
2410 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002411 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002412 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002413 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002414 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002415 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002416 suffix = bytes(u_suffix.encode("utf-8"))
2417 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002418 with self.open(support.TESTFN, "wb") as f:
2419 f.write(line*2)
2420 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2421 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002422 self.assertEqual(s, str(prefix, "ascii"))
2423 self.assertEqual(f.tell(), prefix_size)
2424 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002425
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002426 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002427 # Regression test for a specific bug
2428 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002429 with self.open(support.TESTFN, "wb") as f:
2430 f.write(data)
2431 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2432 f._CHUNK_SIZE # Just test that it exists
2433 f._CHUNK_SIZE = 2
2434 f.readline()
2435 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002437 def test_seek_and_tell(self):
2438 #Test seek/tell using the StatefulIncrementalDecoder.
2439 # Make test faster by doing smaller seeks
2440 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002441
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002442 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002443 """Tell/seek to various points within a data stream and ensure
2444 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002445 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002446 f.write(data)
2447 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 f = self.open(support.TESTFN, encoding='test_decoder')
2449 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002450 decoded = f.read()
2451 f.close()
2452
Neal Norwitze2b07052008-03-18 19:52:05 +00002453 for i in range(min_pos, len(decoded) + 1): # seek positions
2454 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002455 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002456 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002457 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002459 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002460 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002461 f.close()
2462
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002463 # Enable the test decoder.
2464 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002465
2466 # Run the tests.
2467 try:
2468 # Try each test case.
2469 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002470 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002471
2472 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002473 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2474 offset = CHUNK_SIZE - len(input)//2
2475 prefix = b'.'*offset
2476 # Don't bother seeking into the prefix (takes too long).
2477 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002478 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002479
2480 # Ensure our test decoder won't interfere with subsequent tests.
2481 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002482 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002483
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002484 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002485 data = "1234567890"
2486 tests = ("utf-16",
2487 "utf-16-le",
2488 "utf-16-be",
2489 "utf-32",
2490 "utf-32-le",
2491 "utf-32-be")
2492 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002493 buf = self.BytesIO()
2494 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002495 # Check if the BOM is written only once (see issue1753).
2496 f.write(data)
2497 f.write(data)
2498 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002499 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002500 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(f.read(), data * 2)
2502 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002503
Benjamin Petersona1b49012009-03-31 23:11:32 +00002504 def test_unreadable(self):
2505 class UnReadable(self.BytesIO):
2506 def readable(self):
2507 return False
2508 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002509 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002511 def test_read_one_by_one(self):
2512 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002513 reads = ""
2514 while True:
2515 c = txt.read(1)
2516 if not c:
2517 break
2518 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002519 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002520
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002521 def test_readlines(self):
2522 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2523 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2524 txt.seek(0)
2525 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2526 txt.seek(0)
2527 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2528
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002529 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002531 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002533 reads = ""
2534 while True:
2535 c = txt.read(128)
2536 if not c:
2537 break
2538 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002540
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002541 def test_writelines(self):
2542 l = ['ab', 'cd', 'ef']
2543 buf = self.BytesIO()
2544 txt = self.TextIOWrapper(buf)
2545 txt.writelines(l)
2546 txt.flush()
2547 self.assertEqual(buf.getvalue(), b'abcdef')
2548
2549 def test_writelines_userlist(self):
2550 l = UserList(['ab', 'cd', 'ef'])
2551 buf = self.BytesIO()
2552 txt = self.TextIOWrapper(buf)
2553 txt.writelines(l)
2554 txt.flush()
2555 self.assertEqual(buf.getvalue(), b'abcdef')
2556
2557 def test_writelines_error(self):
2558 txt = self.TextIOWrapper(self.BytesIO())
2559 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2560 self.assertRaises(TypeError, txt.writelines, None)
2561 self.assertRaises(TypeError, txt.writelines, b'abc')
2562
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002563 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002565
2566 # read one char at a time
2567 reads = ""
2568 while True:
2569 c = txt.read(1)
2570 if not c:
2571 break
2572 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002574
2575 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002576 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002577 txt._CHUNK_SIZE = 4
2578
2579 reads = ""
2580 while True:
2581 c = txt.read(4)
2582 if not c:
2583 break
2584 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002585 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002586
2587 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002588 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002589 txt._CHUNK_SIZE = 4
2590
2591 reads = txt.read(4)
2592 reads += txt.read(4)
2593 reads += txt.readline()
2594 reads += txt.readline()
2595 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002596 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002597
2598 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002599 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002600 txt._CHUNK_SIZE = 4
2601
2602 reads = txt.read(4)
2603 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002604 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002605
2606 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002607 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002608 txt._CHUNK_SIZE = 4
2609
2610 reads = txt.read(4)
2611 pos = txt.tell()
2612 txt.seek(0)
2613 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002614 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002615
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002616 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002617 buffer = self.BytesIO(self.testdata)
2618 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002619
2620 self.assertEqual(buffer.seekable(), txt.seekable())
2621
Antoine Pitroue4501852009-05-14 18:55:55 +00002622 def test_append_bom(self):
2623 # The BOM is not written again when appending to a non-empty file
2624 filename = support.TESTFN
2625 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2626 with self.open(filename, 'w', encoding=charset) as f:
2627 f.write('aaa')
2628 pos = f.tell()
2629 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002630 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002631
2632 with self.open(filename, 'a', encoding=charset) as f:
2633 f.write('xxx')
2634 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002635 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002636
2637 def test_seek_bom(self):
2638 # Same test, but when seeking manually
2639 filename = support.TESTFN
2640 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2641 with self.open(filename, 'w', encoding=charset) as f:
2642 f.write('aaa')
2643 pos = f.tell()
2644 with self.open(filename, 'r+', encoding=charset) as f:
2645 f.seek(pos)
2646 f.write('zzz')
2647 f.seek(0)
2648 f.write('bbb')
2649 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002651
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002652 def test_errors_property(self):
2653 with self.open(support.TESTFN, "w") as f:
2654 self.assertEqual(f.errors, "strict")
2655 with self.open(support.TESTFN, "w", errors="replace") as f:
2656 self.assertEqual(f.errors, "replace")
2657
Brett Cannon31f59292011-02-21 19:29:56 +00002658 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002659 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002660 def test_threads_write(self):
2661 # Issue6750: concurrent writes could duplicate data
2662 event = threading.Event()
2663 with self.open(support.TESTFN, "w", buffering=1) as f:
2664 def run(n):
2665 text = "Thread%03d\n" % n
2666 event.wait()
2667 f.write(text)
2668 threads = [threading.Thread(target=lambda n=x: run(n))
2669 for x in range(20)]
2670 for t in threads:
2671 t.start()
2672 time.sleep(0.02)
2673 event.set()
2674 for t in threads:
2675 t.join()
2676 with self.open(support.TESTFN) as f:
2677 content = f.read()
2678 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002680
Antoine Pitrou6be88762010-05-03 16:48:20 +00002681 def test_flush_error_on_close(self):
2682 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2683 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002684 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002685 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002686 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002687 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002688
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002689 def test_close_error_on_close(self):
2690 buffer = self.BytesIO(self.testdata)
2691 def bad_flush():
2692 raise OSError('flush')
2693 def bad_close():
2694 raise OSError('close')
2695 buffer.close = bad_close
2696 txt = self.TextIOWrapper(buffer, encoding="ascii")
2697 txt.flush = bad_flush
2698 with self.assertRaises(OSError) as err: # exception not swallowed
2699 txt.close()
2700 self.assertEqual(err.exception.args, ('close',))
2701 self.assertIsInstance(err.exception.__context__, OSError)
2702 self.assertEqual(err.exception.__context__.args, ('flush',))
2703 self.assertFalse(txt.closed)
2704
2705 def test_nonnormalized_close_error_on_close(self):
2706 # Issue #21677
2707 buffer = self.BytesIO(self.testdata)
2708 def bad_flush():
2709 raise non_existing_flush
2710 def bad_close():
2711 raise non_existing_close
2712 buffer.close = bad_close
2713 txt = self.TextIOWrapper(buffer, encoding="ascii")
2714 txt.flush = bad_flush
2715 with self.assertRaises(NameError) as err: # exception not swallowed
2716 txt.close()
2717 self.assertIn('non_existing_close', str(err.exception))
2718 self.assertIsInstance(err.exception.__context__, NameError)
2719 self.assertIn('non_existing_flush', str(err.exception.__context__))
2720 self.assertFalse(txt.closed)
2721
Antoine Pitrou6be88762010-05-03 16:48:20 +00002722 def test_multi_close(self):
2723 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2724 txt.close()
2725 txt.close()
2726 txt.close()
2727 self.assertRaises(ValueError, txt.flush)
2728
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002729 def test_unseekable(self):
2730 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2731 self.assertRaises(self.UnsupportedOperation, txt.tell)
2732 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2733
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002734 def test_readonly_attributes(self):
2735 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2736 buf = self.BytesIO(self.testdata)
2737 with self.assertRaises(AttributeError):
2738 txt.buffer = buf
2739
Antoine Pitroue96ec682011-07-23 21:46:35 +02002740 def test_rawio(self):
2741 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2742 # that subprocess.Popen() can have the required unbuffered
2743 # semantics with universal_newlines=True.
2744 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2745 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2746 # Reads
2747 self.assertEqual(txt.read(4), 'abcd')
2748 self.assertEqual(txt.readline(), 'efghi\n')
2749 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2750
2751 def test_rawio_write_through(self):
2752 # Issue #12591: with write_through=True, writes don't need a flush
2753 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2754 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2755 write_through=True)
2756 txt.write('1')
2757 txt.write('23\n4')
2758 txt.write('5')
2759 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2760
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002761 def test_bufio_write_through(self):
2762 # Issue #21396: write_through=True doesn't force a flush()
2763 # on the underlying binary buffered object.
2764 flush_called, write_called = [], []
2765 class BufferedWriter(self.BufferedWriter):
2766 def flush(self, *args, **kwargs):
2767 flush_called.append(True)
2768 return super().flush(*args, **kwargs)
2769 def write(self, *args, **kwargs):
2770 write_called.append(True)
2771 return super().write(*args, **kwargs)
2772
2773 rawio = self.BytesIO()
2774 data = b"a"
2775 bufio = BufferedWriter(rawio, len(data)*2)
2776 textio = self.TextIOWrapper(bufio, encoding='ascii',
2777 write_through=True)
2778 # write to the buffered io but don't overflow the buffer
2779 text = data.decode('ascii')
2780 textio.write(text)
2781
2782 # buffer.flush is not called with write_through=True
2783 self.assertFalse(flush_called)
2784 # buffer.write *is* called with write_through=True
2785 self.assertTrue(write_called)
2786 self.assertEqual(rawio.getvalue(), b"") # no flush
2787
2788 write_called = [] # reset
2789 textio.write(text * 10) # total content is larger than bufio buffer
2790 self.assertTrue(write_called)
2791 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2792
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002793 def test_read_nonbytes(self):
2794 # Issue #17106
2795 # Crash when underlying read() returns non-bytes
2796 t = self.TextIOWrapper(self.StringIO('a'))
2797 self.assertRaises(TypeError, t.read, 1)
2798 t = self.TextIOWrapper(self.StringIO('a'))
2799 self.assertRaises(TypeError, t.readline)
2800 t = self.TextIOWrapper(self.StringIO('a'))
2801 self.assertRaises(TypeError, t.read)
2802
2803 def test_illegal_decoder(self):
2804 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002805 # Bypass the early encoding check added in issue 20404
2806 def _make_illegal_wrapper():
2807 quopri = codecs.lookup("quopri")
2808 quopri._is_text_encoding = True
2809 try:
2810 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2811 newline='\n', encoding="quopri")
2812 finally:
2813 quopri._is_text_encoding = False
2814 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002815 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002816 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002817 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002818 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002819 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002820 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002821 self.assertRaises(TypeError, t.read)
2822
Antoine Pitrou712cb732013-12-21 15:51:54 +01002823 def _check_create_at_shutdown(self, **kwargs):
2824 # Issue #20037: creating a TextIOWrapper at shutdown
2825 # shouldn't crash the interpreter.
2826 iomod = self.io.__name__
2827 code = """if 1:
2828 import codecs
2829 import {iomod} as io
2830
2831 # Avoid looking up codecs at shutdown
2832 codecs.lookup('utf-8')
2833
2834 class C:
2835 def __init__(self):
2836 self.buf = io.BytesIO()
2837 def __del__(self):
2838 io.TextIOWrapper(self.buf, **{kwargs})
2839 print("ok")
2840 c = C()
2841 """.format(iomod=iomod, kwargs=kwargs)
2842 return assert_python_ok("-c", code)
2843
2844 def test_create_at_shutdown_without_encoding(self):
2845 rc, out, err = self._check_create_at_shutdown()
2846 if err:
2847 # Can error out with a RuntimeError if the module state
2848 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002849 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002850 else:
2851 self.assertEqual("ok", out.decode().strip())
2852
2853 def test_create_at_shutdown_with_encoding(self):
2854 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2855 errors='strict')
2856 self.assertFalse(err)
2857 self.assertEqual("ok", out.decode().strip())
2858
Antoine Pitroub8503892014-04-29 10:14:02 +02002859 def test_read_byteslike(self):
2860 r = MemviewBytesIO(b'Just some random string\n')
2861 t = self.TextIOWrapper(r, 'utf-8')
2862
2863 # TextIOwrapper will not read the full string, because
2864 # we truncate it to a multiple of the native int size
2865 # so that we can construct a more complex memoryview.
2866 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2867
2868 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2869
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002870 def test_issue22849(self):
2871 class F(object):
2872 def readable(self): return True
2873 def writable(self): return True
2874 def seekable(self): return True
2875
2876 for i in range(10):
2877 try:
2878 self.TextIOWrapper(F(), encoding='utf-8')
2879 except Exception:
2880 pass
2881
2882 F.tell = lambda x: 0
2883 t = self.TextIOWrapper(F(), encoding='utf-8')
2884
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002885
Antoine Pitroub8503892014-04-29 10:14:02 +02002886class MemviewBytesIO(io.BytesIO):
2887 '''A BytesIO object whose read method returns memoryviews
2888 rather than bytes'''
2889
2890 def read1(self, len_):
2891 return _to_memoryview(super().read1(len_))
2892
2893 def read(self, len_):
2894 return _to_memoryview(super().read(len_))
2895
2896def _to_memoryview(buf):
2897 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2898
2899 arr = array.array('i')
2900 idx = len(buf) - len(buf) % arr.itemsize
2901 arr.frombytes(buf[:idx])
2902 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002903
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05002904
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002905class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002906 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002907 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002908
2909 def test_initialization(self):
2910 r = self.BytesIO(b"\xc3\xa9\n\n")
2911 b = self.BufferedReader(r, 1000)
2912 t = self.TextIOWrapper(b)
2913 self.assertRaises(TypeError, t.__init__, b, newline=42)
2914 self.assertRaises(ValueError, t.read)
2915 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2916 self.assertRaises(ValueError, t.read)
2917
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002918 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2919 self.assertRaises(Exception, repr, t)
2920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 def test_garbage_collection(self):
2922 # C TextIOWrapper objects are collected, and collecting them flushes
2923 # all data to disk.
2924 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002925 with support.check_warnings(('', ResourceWarning)):
2926 rawio = io.FileIO(support.TESTFN, "wb")
2927 b = self.BufferedWriter(rawio)
2928 t = self.TextIOWrapper(b, encoding="ascii")
2929 t.write("456def")
2930 t.x = t
2931 wr = weakref.ref(t)
2932 del t
2933 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002934 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002935 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002936 self.assertEqual(f.read(), b"456def")
2937
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002938 def test_rwpair_cleared_before_textio(self):
2939 # Issue 13070: TextIOWrapper's finalization would crash when called
2940 # after the reference to the underlying BufferedRWPair's writer got
2941 # cleared by the GC.
2942 for i in range(1000):
2943 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2944 t1 = self.TextIOWrapper(b1, encoding="ascii")
2945 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2946 t2 = self.TextIOWrapper(b2, encoding="ascii")
2947 # circular references
2948 t1.buddy = t2
2949 t2.buddy = t1
2950 support.gc_collect()
2951
2952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002953class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002954 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002955 #shutdown_error = "LookupError: unknown encoding: ascii"
2956 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002957
2958
2959class IncrementalNewlineDecoderTest(unittest.TestCase):
2960
2961 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002962 # UTF-8 specific tests for a newline decoder
2963 def _check_decode(b, s, **kwargs):
2964 # We exercise getstate() / setstate() as well as decode()
2965 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002966 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002967 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002968 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002969
Antoine Pitrou180a3362008-12-14 16:36:46 +00002970 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002971
Antoine Pitrou180a3362008-12-14 16:36:46 +00002972 _check_decode(b'\xe8', "")
2973 _check_decode(b'\xa2', "")
2974 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002975
Antoine Pitrou180a3362008-12-14 16:36:46 +00002976 _check_decode(b'\xe8', "")
2977 _check_decode(b'\xa2', "")
2978 _check_decode(b'\x88', "\u8888")
2979
2980 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002981 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2982
Antoine Pitrou180a3362008-12-14 16:36:46 +00002983 decoder.reset()
2984 _check_decode(b'\n', "\n")
2985 _check_decode(b'\r', "")
2986 _check_decode(b'', "\n", final=True)
2987 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002988
Antoine Pitrou180a3362008-12-14 16:36:46 +00002989 _check_decode(b'\r', "")
2990 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002991
Antoine Pitrou180a3362008-12-14 16:36:46 +00002992 _check_decode(b'\r\r\n', "\n\n")
2993 _check_decode(b'\r', "")
2994 _check_decode(b'\r', "\n")
2995 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002996
Antoine Pitrou180a3362008-12-14 16:36:46 +00002997 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2998 _check_decode(b'\xe8\xa2\x88', "\u8888")
2999 _check_decode(b'\n', "\n")
3000 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3001 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003003 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003004 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003005 if encoding is not None:
3006 encoder = codecs.getincrementalencoder(encoding)()
3007 def _decode_bytewise(s):
3008 # Decode one byte at a time
3009 for b in encoder.encode(s):
3010 result.append(decoder.decode(bytes([b])))
3011 else:
3012 encoder = None
3013 def _decode_bytewise(s):
3014 # Decode one char at a time
3015 for c in s:
3016 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003017 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003018 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003020 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003021 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003022 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003023 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003024 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003025 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003026 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003027 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003028 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003029 input = "abc"
3030 if encoder is not None:
3031 encoder.reset()
3032 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003033 self.assertEqual(decoder.decode(input), "abc")
3034 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003035
3036 def test_newline_decoder(self):
3037 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003038 # None meaning the IncrementalNewlineDecoder takes unicode input
3039 # rather than bytes input
3040 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003041 'utf-16', 'utf-16-le', 'utf-16-be',
3042 'utf-32', 'utf-32-le', 'utf-32-be',
3043 )
3044 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003045 decoder = enc and codecs.getincrementaldecoder(enc)()
3046 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3047 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003048 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003049 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3050 self.check_newline_decoding_utf8(decoder)
3051
Antoine Pitrou66913e22009-03-06 23:40:56 +00003052 def test_newline_bytes(self):
3053 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3054 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003055 self.assertEqual(dec.newlines, None)
3056 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3057 self.assertEqual(dec.newlines, None)
3058 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3059 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003060 dec = self.IncrementalNewlineDecoder(None, translate=False)
3061 _check(dec)
3062 dec = self.IncrementalNewlineDecoder(None, translate=True)
3063 _check(dec)
3064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3066 pass
3067
3068class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3069 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003070
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003071
Guido van Rossum01a27522007-03-07 01:00:12 +00003072# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003073
Guido van Rossum5abbf752007-08-27 17:39:33 +00003074class MiscIOTest(unittest.TestCase):
3075
Barry Warsaw40e82462008-11-20 20:14:50 +00003076 def tearDown(self):
3077 support.unlink(support.TESTFN)
3078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003079 def test___all__(self):
3080 for name in self.io.__all__:
3081 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003082 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003083 if name == "open":
3084 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003085 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003086 self.assertTrue(issubclass(obj, Exception), name)
3087 elif not name.startswith("SEEK_"):
3088 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003089
Barry Warsaw40e82462008-11-20 20:14:50 +00003090 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003091 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003092 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003093 f.close()
3094
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003095 with support.check_warnings(('', DeprecationWarning)):
3096 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003097 self.assertEqual(f.name, support.TESTFN)
3098 self.assertEqual(f.buffer.name, support.TESTFN)
3099 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3100 self.assertEqual(f.mode, "U")
3101 self.assertEqual(f.buffer.mode, "rb")
3102 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003103 f.close()
3104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003105 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003106 self.assertEqual(f.mode, "w+")
3107 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3108 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003110 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(g.mode, "wb")
3112 self.assertEqual(g.raw.mode, "wb")
3113 self.assertEqual(g.name, f.fileno())
3114 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003115 f.close()
3116 g.close()
3117
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003118 def test_io_after_close(self):
3119 for kwargs in [
3120 {"mode": "w"},
3121 {"mode": "wb"},
3122 {"mode": "w", "buffering": 1},
3123 {"mode": "w", "buffering": 2},
3124 {"mode": "wb", "buffering": 0},
3125 {"mode": "r"},
3126 {"mode": "rb"},
3127 {"mode": "r", "buffering": 1},
3128 {"mode": "r", "buffering": 2},
3129 {"mode": "rb", "buffering": 0},
3130 {"mode": "w+"},
3131 {"mode": "w+b"},
3132 {"mode": "w+", "buffering": 1},
3133 {"mode": "w+", "buffering": 2},
3134 {"mode": "w+b", "buffering": 0},
3135 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003136 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003137 f.close()
3138 self.assertRaises(ValueError, f.flush)
3139 self.assertRaises(ValueError, f.fileno)
3140 self.assertRaises(ValueError, f.isatty)
3141 self.assertRaises(ValueError, f.__iter__)
3142 if hasattr(f, "peek"):
3143 self.assertRaises(ValueError, f.peek, 1)
3144 self.assertRaises(ValueError, f.read)
3145 if hasattr(f, "read1"):
3146 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003147 if hasattr(f, "readall"):
3148 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003149 if hasattr(f, "readinto"):
3150 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003151 if hasattr(f, "readinto1"):
3152 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003153 self.assertRaises(ValueError, f.readline)
3154 self.assertRaises(ValueError, f.readlines)
3155 self.assertRaises(ValueError, f.seek, 0)
3156 self.assertRaises(ValueError, f.tell)
3157 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003158 self.assertRaises(ValueError, f.write,
3159 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003160 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163 def test_blockingioerror(self):
3164 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003165 class C(str):
3166 pass
3167 c = C("")
3168 b = self.BlockingIOError(1, c)
3169 c.b = b
3170 b.c = c
3171 wr = weakref.ref(c)
3172 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003173 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003174 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175
3176 def test_abcs(self):
3177 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003178 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3179 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3180 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3181 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003182
3183 def _check_abc_inheritance(self, abcmodule):
3184 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003185 self.assertIsInstance(f, abcmodule.IOBase)
3186 self.assertIsInstance(f, abcmodule.RawIOBase)
3187 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3188 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003189 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003190 self.assertIsInstance(f, abcmodule.IOBase)
3191 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3192 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3193 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003194 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003195 self.assertIsInstance(f, abcmodule.IOBase)
3196 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3197 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3198 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199
3200 def test_abc_inheritance(self):
3201 # Test implementations inherit from their respective ABCs
3202 self._check_abc_inheritance(self)
3203
3204 def test_abc_inheritance_official(self):
3205 # Test implementations inherit from the official ABCs of the
3206 # baseline "io" module.
3207 self._check_abc_inheritance(io)
3208
Antoine Pitroue033e062010-10-29 10:38:18 +00003209 def _check_warn_on_dealloc(self, *args, **kwargs):
3210 f = open(*args, **kwargs)
3211 r = repr(f)
3212 with self.assertWarns(ResourceWarning) as cm:
3213 f = None
3214 support.gc_collect()
3215 self.assertIn(r, str(cm.warning.args[0]))
3216
3217 def test_warn_on_dealloc(self):
3218 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3219 self._check_warn_on_dealloc(support.TESTFN, "wb")
3220 self._check_warn_on_dealloc(support.TESTFN, "w")
3221
3222 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3223 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003224 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003225 for fd in fds:
3226 try:
3227 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003228 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003229 if e.errno != errno.EBADF:
3230 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003231 self.addCleanup(cleanup_fds)
3232 r, w = os.pipe()
3233 fds += r, w
3234 self._check_warn_on_dealloc(r, *args, **kwargs)
3235 # When using closefd=False, there's no warning
3236 r, w = os.pipe()
3237 fds += r, w
3238 with warnings.catch_warnings(record=True) as recorded:
3239 open(r, *args, closefd=False, **kwargs)
3240 support.gc_collect()
3241 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003242
3243 def test_warn_on_dealloc_fd(self):
3244 self._check_warn_on_dealloc_fd("rb", buffering=0)
3245 self._check_warn_on_dealloc_fd("rb")
3246 self._check_warn_on_dealloc_fd("r")
3247
3248
Antoine Pitrou243757e2010-11-05 21:15:39 +00003249 def test_pickling(self):
3250 # Pickling file objects is forbidden
3251 for kwargs in [
3252 {"mode": "w"},
3253 {"mode": "wb"},
3254 {"mode": "wb", "buffering": 0},
3255 {"mode": "r"},
3256 {"mode": "rb"},
3257 {"mode": "rb", "buffering": 0},
3258 {"mode": "w+"},
3259 {"mode": "w+b"},
3260 {"mode": "w+b", "buffering": 0},
3261 ]:
3262 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3263 with self.open(support.TESTFN, **kwargs) as f:
3264 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3265
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003266 def test_nonblock_pipe_write_bigbuf(self):
3267 self._test_nonblock_pipe_write(16*1024)
3268
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003269 def test_nonblock_pipe_write_smallbuf(self):
3270 self._test_nonblock_pipe_write(1024)
3271
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003272 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3273 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003274 def _test_nonblock_pipe_write(self, bufsize):
3275 sent = []
3276 received = []
3277 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003278 os.set_blocking(r, False)
3279 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003280
3281 # To exercise all code paths in the C implementation we need
3282 # to play with buffer sizes. For instance, if we choose a
3283 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3284 # then we will never get a partial write of the buffer.
3285 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3286 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3287
3288 with rf, wf:
3289 for N in 9999, 73, 7574:
3290 try:
3291 i = 0
3292 while True:
3293 msg = bytes([i % 26 + 97]) * N
3294 sent.append(msg)
3295 wf.write(msg)
3296 i += 1
3297
3298 except self.BlockingIOError as e:
3299 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003300 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003301 sent[-1] = sent[-1][:e.characters_written]
3302 received.append(rf.read())
3303 msg = b'BLOCKED'
3304 wf.write(msg)
3305 sent.append(msg)
3306
3307 while True:
3308 try:
3309 wf.flush()
3310 break
3311 except self.BlockingIOError as e:
3312 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003313 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003314 self.assertEqual(e.characters_written, 0)
3315 received.append(rf.read())
3316
3317 received += iter(rf.read, None)
3318
3319 sent, received = b''.join(sent), b''.join(received)
3320 self.assertTrue(sent == received)
3321 self.assertTrue(wf.closed)
3322 self.assertTrue(rf.closed)
3323
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003324 def test_create_fail(self):
3325 # 'x' mode fails if file is existing
3326 with self.open(support.TESTFN, 'w'):
3327 pass
3328 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3329
3330 def test_create_writes(self):
3331 # 'x' mode opens for writing
3332 with self.open(support.TESTFN, 'xb') as f:
3333 f.write(b"spam")
3334 with self.open(support.TESTFN, 'rb') as f:
3335 self.assertEqual(b"spam", f.read())
3336
Christian Heimes7b648752012-09-10 14:48:43 +02003337 def test_open_allargs(self):
3338 # there used to be a buffer overflow in the parser for rawmode
3339 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3340
3341
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003342class CMiscIOTest(MiscIOTest):
3343 io = io
3344
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003345 def test_readinto_buffer_overflow(self):
3346 # Issue #18025
3347 class BadReader(self.io.BufferedIOBase):
3348 def read(self, n=-1):
3349 return b'x' * 10**6
3350 bufio = BadReader()
3351 b = bytearray(2)
3352 self.assertRaises(ValueError, bufio.readinto, b)
3353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003354class PyMiscIOTest(MiscIOTest):
3355 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003356
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003357
3358@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3359class SignalsTest(unittest.TestCase):
3360
3361 def setUp(self):
3362 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3363
3364 def tearDown(self):
3365 signal.signal(signal.SIGALRM, self.oldalrm)
3366
3367 def alarm_interrupt(self, sig, frame):
3368 1/0
3369
3370 @unittest.skipUnless(threading, 'Threading required for this test.')
3371 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3372 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003373 invokes the signal handler, and bubbles up the exception raised
3374 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003375 read_results = []
3376 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003377 if hasattr(signal, 'pthread_sigmask'):
3378 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003379 s = os.read(r, 1)
3380 read_results.append(s)
3381 t = threading.Thread(target=_read)
3382 t.daemon = True
3383 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003384 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003385 try:
3386 wio = self.io.open(w, **fdopen_kwargs)
3387 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003388 # Fill the pipe enough that the write will be blocking.
3389 # It will be interrupted by the timer armed above. Since the
3390 # other thread has read one byte, the low-level write will
3391 # return with a successful (partial) result rather than an EINTR.
3392 # The buffered IO layer must check for pending signal
3393 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003394 signal.alarm(1)
3395 try:
3396 self.assertRaises(ZeroDivisionError,
3397 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3398 finally:
3399 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003400 t.join()
3401 # We got one byte, get another one and check that it isn't a
3402 # repeat of the first one.
3403 read_results.append(os.read(r, 1))
3404 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3405 finally:
3406 os.close(w)
3407 os.close(r)
3408 # This is deliberate. If we didn't close the file descriptor
3409 # before closing wio, wio would try to flush its internal
3410 # buffer, and block again.
3411 try:
3412 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003413 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003414 if e.errno != errno.EBADF:
3415 raise
3416
3417 def test_interrupted_write_unbuffered(self):
3418 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3419
3420 def test_interrupted_write_buffered(self):
3421 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3422
Victor Stinner6ab72862014-09-03 23:32:28 +02003423 # Issue #22331: The test hangs on FreeBSD 7.2
3424 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003425 def test_interrupted_write_text(self):
3426 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3427
Brett Cannon31f59292011-02-21 19:29:56 +00003428 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003429 def check_reentrant_write(self, data, **fdopen_kwargs):
3430 def on_alarm(*args):
3431 # Will be called reentrantly from the same thread
3432 wio.write(data)
3433 1/0
3434 signal.signal(signal.SIGALRM, on_alarm)
3435 r, w = os.pipe()
3436 wio = self.io.open(w, **fdopen_kwargs)
3437 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003438 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003439 # Either the reentrant call to wio.write() fails with RuntimeError,
3440 # or the signal handler raises ZeroDivisionError.
3441 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3442 while 1:
3443 for i in range(100):
3444 wio.write(data)
3445 wio.flush()
3446 # Make sure the buffer doesn't fill up and block further writes
3447 os.read(r, len(data) * 100)
3448 exc = cm.exception
3449 if isinstance(exc, RuntimeError):
3450 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3451 finally:
3452 wio.close()
3453 os.close(r)
3454
3455 def test_reentrant_write_buffered(self):
3456 self.check_reentrant_write(b"xy", mode="wb")
3457
3458 def test_reentrant_write_text(self):
3459 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3460
Antoine Pitrou707ce822011-02-25 21:24:11 +00003461 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3462 """Check that a buffered read, when it gets interrupted (either
3463 returning a partial result or EINTR), properly invokes the signal
3464 handler and retries if the latter returned successfully."""
3465 r, w = os.pipe()
3466 fdopen_kwargs["closefd"] = False
3467 def alarm_handler(sig, frame):
3468 os.write(w, b"bar")
3469 signal.signal(signal.SIGALRM, alarm_handler)
3470 try:
3471 rio = self.io.open(r, **fdopen_kwargs)
3472 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003473 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003474 # Expected behaviour:
3475 # - first raw read() returns partial b"foo"
3476 # - second raw read() returns EINTR
3477 # - third raw read() returns b"bar"
3478 self.assertEqual(decode(rio.read(6)), "foobar")
3479 finally:
3480 rio.close()
3481 os.close(w)
3482 os.close(r)
3483
Antoine Pitrou20db5112011-08-19 20:32:34 +02003484 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003485 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3486 mode="rb")
3487
Antoine Pitrou20db5112011-08-19 20:32:34 +02003488 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003489 self.check_interrupted_read_retry(lambda x: x,
3490 mode="r")
3491
3492 @unittest.skipUnless(threading, 'Threading required for this test.')
3493 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3494 """Check that a buffered write, when it gets interrupted (either
3495 returning a partial result or EINTR), properly invokes the signal
3496 handler and retries if the latter returned successfully."""
3497 select = support.import_module("select")
3498 # A quantity that exceeds the buffer size of an anonymous pipe's
3499 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003500 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003501 r, w = os.pipe()
3502 fdopen_kwargs["closefd"] = False
3503 # We need a separate thread to read from the pipe and allow the
3504 # write() to finish. This thread is started after the SIGALRM is
3505 # received (forcing a first EINTR in write()).
3506 read_results = []
3507 write_finished = False
3508 def _read():
3509 while not write_finished:
3510 while r in select.select([r], [], [], 1.0)[0]:
3511 s = os.read(r, 1024)
3512 read_results.append(s)
3513 t = threading.Thread(target=_read)
3514 t.daemon = True
3515 def alarm1(sig, frame):
3516 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003517 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003518 def alarm2(sig, frame):
3519 t.start()
3520 signal.signal(signal.SIGALRM, alarm1)
3521 try:
3522 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003523 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003524 # Expected behaviour:
3525 # - first raw write() is partial (because of the limited pipe buffer
3526 # and the first alarm)
3527 # - second raw write() returns EINTR (because of the second alarm)
3528 # - subsequent write()s are successful (either partial or complete)
3529 self.assertEqual(N, wio.write(item * N))
3530 wio.flush()
3531 write_finished = True
3532 t.join()
3533 self.assertEqual(N, sum(len(x) for x in read_results))
3534 finally:
3535 write_finished = True
3536 os.close(w)
3537 os.close(r)
3538 # This is deliberate. If we didn't close the file descriptor
3539 # before closing wio, wio would try to flush its internal
3540 # buffer, and could block (in case of failure).
3541 try:
3542 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003543 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003544 if e.errno != errno.EBADF:
3545 raise
3546
Antoine Pitrou20db5112011-08-19 20:32:34 +02003547 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003548 self.check_interrupted_write_retry(b"x", mode="wb")
3549
Antoine Pitrou20db5112011-08-19 20:32:34 +02003550 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003551 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3552
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003553
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003554class CSignalsTest(SignalsTest):
3555 io = io
3556
3557class PySignalsTest(SignalsTest):
3558 io = pyio
3559
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003560 # Handling reentrancy issues would slow down _pyio even more, so the
3561 # tests are disabled.
3562 test_reentrant_write_buffered = None
3563 test_reentrant_write_text = None
3564
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003565
Ezio Melottidaa42c72013-03-23 16:30:16 +02003566def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003567 tests = (CIOTest, PyIOTest,
3568 CBufferedReaderTest, PyBufferedReaderTest,
3569 CBufferedWriterTest, PyBufferedWriterTest,
3570 CBufferedRWPairTest, PyBufferedRWPairTest,
3571 CBufferedRandomTest, PyBufferedRandomTest,
3572 StatefulIncrementalDecoderTest,
3573 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3574 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003575 CMiscIOTest, PyMiscIOTest,
3576 CSignalsTest, PySignalsTest,
3577 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003578
3579 # Put the namespaces of the IO module we are testing and some useful mock
3580 # classes in the __dict__ of each test.
3581 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003582 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003583 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3584 c_io_ns = {name : getattr(io, name) for name in all_members}
3585 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3586 globs = globals()
3587 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3588 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3589 # Avoid turning open into a bound method.
3590 py_io_ns["open"] = pyio.OpenWrapper
3591 for test in tests:
3592 if test.__name__.startswith("C"):
3593 for name, obj in c_io_ns.items():
3594 setattr(test, name, obj)
3595 elif test.__name__.startswith("Py"):
3596 for name, obj in py_io_ns.items():
3597 setattr(test, name, obj)
3598
Ezio Melottidaa42c72013-03-23 16:30:16 +02003599 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3600 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003601
3602if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003603 unittest.main()