blob: a424f760f7a2a057a38503e69c30cbe0fcd0a869 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300656 def test_nonbuffered_textio(self):
657 with warnings.catch_warnings(record=True) as recorded:
658 with self.assertRaises(ValueError):
659 self.open(support.TESTFN, 'w', buffering=0)
660 support.gc_collect()
661 self.assertEqual(recorded, [])
662
663 def test_invalid_newline(self):
664 with warnings.catch_warnings(record=True) as recorded:
665 with self.assertRaises(ValueError):
666 self.open(support.TESTFN, 'w', newline='invalid')
667 support.gc_collect()
668 self.assertEqual(recorded, [])
669
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200672
673 def test_IOBase_finalize(self):
674 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
675 # class which inherits IOBase and an object of this class are caught
676 # in a reference cycle and close() is already in the method cache.
677 class MyIO(self.IOBase):
678 def close(self):
679 pass
680
681 # create an instance to populate the method cache
682 MyIO()
683 obj = MyIO()
684 obj.obj = obj
685 wr = weakref.ref(obj)
686 del MyIO
687 del obj
688 support.gc_collect()
689 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class PyIOTest(IOTest):
692 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695class CommonBufferedTests:
696 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
697
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000698 def test_detach(self):
699 raw = self.MockRawIO()
700 buf = self.tp(raw)
701 self.assertIs(buf.detach(), raw)
702 self.assertRaises(ValueError, buf.detach)
703
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600704 repr(buf) # Should still work
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_fileno(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709
Ezio Melottib3aedd42010-11-20 19:04:17 +0000710 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711
Zachary Ware9fe6d862013-12-08 00:20:35 -0600712 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000713 def test_no_fileno(self):
714 # XXX will we always have fileno() function? If so, kill
715 # this test. Else, write it.
716 pass
717
718 def test_invalid_args(self):
719 rawio = self.MockRawIO()
720 bufio = self.tp(rawio)
721 # Invalid whence
722 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200723 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724
725 def test_override_destructor(self):
726 tp = self.tp
727 record = []
728 class MyBufferedIO(tp):
729 def __del__(self):
730 record.append(1)
731 try:
732 f = super().__del__
733 except AttributeError:
734 pass
735 else:
736 f()
737 def close(self):
738 record.append(2)
739 super().close()
740 def flush(self):
741 record.append(3)
742 super().flush()
743 rawio = self.MockRawIO()
744 bufio = MyBufferedIO(rawio)
745 writable = bufio.writable()
746 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000747 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 if writable:
749 self.assertEqual(record, [1, 2, 3])
750 else:
751 self.assertEqual(record, [1, 2])
752
753 def test_context_manager(self):
754 # Test usability as a context manager
755 rawio = self.MockRawIO()
756 bufio = self.tp(rawio)
757 def _with():
758 with bufio:
759 pass
760 _with()
761 # bufio should now be closed, and using it a second time should raise
762 # a ValueError.
763 self.assertRaises(ValueError, _with)
764
765 def test_error_through_destructor(self):
766 # Test that the exception state is not modified by a destructor,
767 # even if close() fails.
768 rawio = self.CloseFailureIO()
769 def f():
770 self.tp(rawio).xyzzy
771 with support.captured_output("stderr") as s:
772 self.assertRaises(AttributeError, f)
773 s = s.getvalue().strip()
774 if s:
775 # The destructor *may* have printed an unraisable error, check it
776 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200777 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000778 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000779
Antoine Pitrou716c4442009-05-23 19:04:03 +0000780 def test_repr(self):
781 raw = self.MockRawIO()
782 b = self.tp(raw)
783 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
784 self.assertEqual(repr(b), "<%s>" % clsname)
785 raw.name = "dummy"
786 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
787 raw.name = b"dummy"
788 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
789
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790 def test_flush_error_on_close(self):
791 raw = self.MockRawIO()
792 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200793 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000794 raw.flush = bad_flush
795 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 self.assertTrue(b.closed)
798
799 def test_close_error_on_close(self):
800 raw = self.MockRawIO()
801 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200804 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600805 raw.close = bad_close
806 b = self.tp(raw)
807 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600809 b.close()
810 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300811 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600812 self.assertEqual(err.exception.__context__.args, ('flush',))
813 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000814
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300815 def test_nonnormalized_close_error_on_close(self):
816 # Issue #21677
817 raw = self.MockRawIO()
818 def bad_flush():
819 raise non_existing_flush
820 def bad_close():
821 raise non_existing_close
822 raw.close = bad_close
823 b = self.tp(raw)
824 b.flush = bad_flush
825 with self.assertRaises(NameError) as err: # exception not swallowed
826 b.close()
827 self.assertIn('non_existing_close', str(err.exception))
828 self.assertIsInstance(err.exception.__context__, NameError)
829 self.assertIn('non_existing_flush', str(err.exception.__context__))
830 self.assertFalse(b.closed)
831
Antoine Pitrou6be88762010-05-03 16:48:20 +0000832 def test_multi_close(self):
833 raw = self.MockRawIO()
834 b = self.tp(raw)
835 b.close()
836 b.close()
837 b.close()
838 self.assertRaises(ValueError, b.flush)
839
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000840 def test_unseekable(self):
841 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
842 self.assertRaises(self.UnsupportedOperation, bufio.tell)
843 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
844
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000845 def test_readonly_attributes(self):
846 raw = self.MockRawIO()
847 buf = self.tp(raw)
848 x = self.MockRawIO()
849 with self.assertRaises(AttributeError):
850 buf.raw = x
851
Guido van Rossum78892e42007-04-06 17:31:18 +0000852
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200853class SizeofTest:
854
855 @support.cpython_only
856 def test_sizeof(self):
857 bufsize1 = 4096
858 bufsize2 = 8192
859 rawio = self.MockRawIO()
860 bufio = self.tp(rawio, buffer_size=bufsize1)
861 size = sys.getsizeof(bufio) - bufsize1
862 rawio = self.MockRawIO()
863 bufio = self.tp(rawio, buffer_size=bufsize2)
864 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
865
Jesus Ceadc469452012-10-04 12:37:56 +0200866 @support.cpython_only
867 def test_buffer_freeing(self) :
868 bufsize = 4096
869 rawio = self.MockRawIO()
870 bufio = self.tp(rawio, buffer_size=bufsize)
871 size = sys.getsizeof(bufio) - bufsize
872 bufio.close()
873 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
876 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000877
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000878 def test_constructor(self):
879 rawio = self.MockRawIO([b"abc"])
880 bufio = self.tp(rawio)
881 bufio.__init__(rawio)
882 bufio.__init__(rawio, buffer_size=1024)
883 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
886 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
887 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
888 rawio = self.MockRawIO([b"abc"])
889 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000891
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200892 def test_uninitialized(self):
893 bufio = self.tp.__new__(self.tp)
894 del bufio
895 bufio = self.tp.__new__(self.tp)
896 self.assertRaisesRegex((ValueError, AttributeError),
897 'uninitialized|has no attribute',
898 bufio.read, 0)
899 bufio.__init__(self.MockRawIO())
900 self.assertEqual(bufio.read(0), b'')
901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000903 for arg in (None, 7):
904 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
905 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000906 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 # Invalid args
908 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_read1(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000913 self.assertEqual(b"a", bufio.read(1))
914 self.assertEqual(b"b", bufio.read1(1))
915 self.assertEqual(rawio._reads, 1)
916 self.assertEqual(b"c", bufio.read1(100))
917 self.assertEqual(rawio._reads, 1)
918 self.assertEqual(b"d", bufio.read1(100))
919 self.assertEqual(rawio._reads, 2)
920 self.assertEqual(b"efg", bufio.read1(100))
921 self.assertEqual(rawio._reads, 3)
922 self.assertEqual(b"", bufio.read1(100))
923 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 # Invalid args
925 self.assertRaises(ValueError, bufio.read1, -1)
926
927 def test_readinto(self):
928 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
929 bufio = self.tp(rawio)
930 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"ab")
933 self.assertEqual(bufio.readinto(b), 2)
934 self.assertEqual(b, b"cd")
935 self.assertEqual(bufio.readinto(b), 2)
936 self.assertEqual(b, b"ef")
937 self.assertEqual(bufio.readinto(b), 1)
938 self.assertEqual(b, b"gf")
939 self.assertEqual(bufio.readinto(b), 0)
940 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200941 rawio = self.MockRawIO((b"abc", None))
942 bufio = self.tp(rawio)
943 self.assertEqual(bufio.readinto(b), 2)
944 self.assertEqual(b, b"ab")
945 self.assertEqual(bufio.readinto(b), 1)
946 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000948 def test_readlines(self):
949 def bufio():
950 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
951 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
953 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
954 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000957 data = b"abcdefghi"
958 dlen = len(data)
959
960 tests = [
961 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
962 [ 100, [ 3, 3, 3], [ dlen ] ],
963 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
964 ]
965
966 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 rawio = self.MockFileIO(data)
968 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000969 pos = 0
970 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000971 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000972 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000974 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000977 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000978 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
979 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000980 self.assertEqual(b"abcd", bufio.read(6))
981 self.assertEqual(b"e", bufio.read(1))
982 self.assertEqual(b"fg", bufio.read())
983 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200984 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000985 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000986
Victor Stinnera80987f2011-05-25 22:47:16 +0200987 rawio = self.MockRawIO((b"a", None, None))
988 self.assertEqual(b"a", rawio.readall())
989 self.assertIsNone(rawio.readall())
990
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 def test_read_past_eof(self):
992 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
993 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Ezio Melottib3aedd42010-11-20 19:04:17 +0000995 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000997 def test_read_all(self):
998 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
999 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001000
Ezio Melottib3aedd42010-11-20 19:04:17 +00001001 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001002
Victor Stinner45df8202010-04-28 22:31:17 +00001003 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001004 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001005 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001006 try:
1007 # Write out many bytes with exactly the same number of 0's,
1008 # 1's... 255's. This will help us check that concurrent reading
1009 # doesn't duplicate or forget contents.
1010 N = 1000
1011 l = list(range(256)) * N
1012 random.shuffle(l)
1013 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001014 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001015 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001016 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001018 errors = []
1019 results = []
1020 def f():
1021 try:
1022 # Intra-buffer read then buffer-flushing read
1023 for n in cycle([1, 19]):
1024 s = bufio.read(n)
1025 if not s:
1026 break
1027 # list.append() is atomic
1028 results.append(s)
1029 except Exception as e:
1030 errors.append(e)
1031 raise
1032 threads = [threading.Thread(target=f) for x in range(20)]
1033 for t in threads:
1034 t.start()
1035 time.sleep(0.02) # yield
1036 for t in threads:
1037 t.join()
1038 self.assertFalse(errors,
1039 "the following exceptions were caught: %r" % errors)
1040 s = b''.join(results)
1041 for i in range(256):
1042 c = bytes(bytearray([i]))
1043 self.assertEqual(s.count(c), N)
1044 finally:
1045 support.unlink(support.TESTFN)
1046
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001047 def test_unseekable(self):
1048 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1049 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1050 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1051 bufio.read(1)
1052 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1053 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 def test_misbehaved_io(self):
1056 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1057 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001058 self.assertRaises(OSError, bufio.seek, 0)
1059 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001061 def test_no_extraneous_read(self):
1062 # Issue #9550; when the raw IO object has satisfied the read request,
1063 # we should not issue any additional reads, otherwise it may block
1064 # (e.g. socket).
1065 bufsize = 16
1066 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1067 rawio = self.MockRawIO([b"x" * n])
1068 bufio = self.tp(rawio, bufsize)
1069 self.assertEqual(bufio.read(n), b"x" * n)
1070 # Simple case: one raw read is enough to satisfy the request.
1071 self.assertEqual(rawio._extraneous_reads, 0,
1072 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1073 # A more complex case where two raw reads are needed to satisfy
1074 # the request.
1075 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1076 bufio = self.tp(rawio, bufsize)
1077 self.assertEqual(bufio.read(n), b"x" * n)
1078 self.assertEqual(rawio._extraneous_reads, 0,
1079 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1080
1081
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001082class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 tp = io.BufferedReader
1084
1085 def test_constructor(self):
1086 BufferedReaderTest.test_constructor(self)
1087 # The allocation can succeed on 32-bit builds, e.g. with more
1088 # than 2GB RAM and a 64-bit kernel.
1089 if sys.maxsize > 0x7FFFFFFF:
1090 rawio = self.MockRawIO()
1091 bufio = self.tp(rawio)
1092 self.assertRaises((OverflowError, MemoryError, ValueError),
1093 bufio.__init__, rawio, sys.maxsize)
1094
1095 def test_initialization(self):
1096 rawio = self.MockRawIO([b"abc"])
1097 bufio = self.tp(rawio)
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1099 self.assertRaises(ValueError, bufio.read)
1100 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1101 self.assertRaises(ValueError, bufio.read)
1102 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1103 self.assertRaises(ValueError, bufio.read)
1104
1105 def test_misbehaved_io_read(self):
1106 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1107 bufio = self.tp(rawio)
1108 # _pyio.BufferedReader seems to implement reading different, so that
1109 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001110 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111
1112 def test_garbage_collection(self):
1113 # C BufferedReader objects are collected.
1114 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001115 with support.check_warnings(('', ResourceWarning)):
1116 rawio = self.FileIO(support.TESTFN, "w+b")
1117 f = self.tp(rawio)
1118 f.f = f
1119 wr = weakref.ref(f)
1120 del f
1121 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001122 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123
R David Murray67bfe802013-02-23 21:51:05 -05001124 def test_args_error(self):
1125 # Issue #17275
1126 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1127 self.tp(io.BytesIO(), 1024, 1024, 1024)
1128
1129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130class PyBufferedReaderTest(BufferedReaderTest):
1131 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001132
Guido van Rossuma9e20242007-03-08 00:43:48 +00001133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1135 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 def test_constructor(self):
1138 rawio = self.MockRawIO()
1139 bufio = self.tp(rawio)
1140 bufio.__init__(rawio)
1141 bufio.__init__(rawio, buffer_size=1024)
1142 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001143 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 bufio.flush()
1145 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1146 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1148 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001151 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001153 def test_uninitialized(self):
1154 bufio = self.tp.__new__(self.tp)
1155 del bufio
1156 bufio = self.tp.__new__(self.tp)
1157 self.assertRaisesRegex((ValueError, AttributeError),
1158 'uninitialized|has no attribute',
1159 bufio.write, b'')
1160 bufio.__init__(self.MockRawIO())
1161 self.assertEqual(bufio.write(b''), 0)
1162
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001163 def test_detach_flush(self):
1164 raw = self.MockRawIO()
1165 buf = self.tp(raw)
1166 buf.write(b"howdy!")
1167 self.assertFalse(raw._write_stack)
1168 buf.detach()
1169 self.assertEqual(raw._write_stack, [b"howdy!"])
1170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001172 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 writer = self.MockRawIO()
1174 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001175 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001176 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 def test_write_overflow(self):
1179 writer = self.MockRawIO()
1180 bufio = self.tp(writer, 8)
1181 contents = b"abcdefghijklmnop"
1182 for n in range(0, len(contents), 3):
1183 bufio.write(contents[n:n+3])
1184 flushed = b"".join(writer._write_stack)
1185 # At least (total - 8) bytes were implicitly flushed, perhaps more
1186 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001187 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 def check_writes(self, intermediate_func):
1190 # Lots of writes, test the flushed output is as expected.
1191 contents = bytes(range(256)) * 1000
1192 n = 0
1193 writer = self.MockRawIO()
1194 bufio = self.tp(writer, 13)
1195 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1196 def gen_sizes():
1197 for size in count(1):
1198 for i in range(15):
1199 yield size
1200 sizes = gen_sizes()
1201 while n < len(contents):
1202 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 intermediate_func(bufio)
1205 n += size
1206 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001207 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 def test_writes(self):
1210 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212 def test_writes_and_flushes(self):
1213 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_writes_and_seeks(self):
1216 def _seekabs(bufio):
1217 pos = bufio.tell()
1218 bufio.seek(pos + 1, 0)
1219 bufio.seek(pos - 1, 0)
1220 bufio.seek(pos, 0)
1221 self.check_writes(_seekabs)
1222 def _seekrel(bufio):
1223 pos = bufio.seek(0, 1)
1224 bufio.seek(+1, 1)
1225 bufio.seek(-1, 1)
1226 bufio.seek(pos, 0)
1227 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 def test_writes_and_truncates(self):
1230 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_write_non_blocking(self):
1233 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001234 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001235
Ezio Melottib3aedd42010-11-20 19:04:17 +00001236 self.assertEqual(bufio.write(b"abcd"), 4)
1237 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238 # 1 byte will be written, the rest will be buffered
1239 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001240 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1243 raw.block_on(b"0")
1244 try:
1245 bufio.write(b"opqrwxyz0123456789")
1246 except self.BlockingIOError as e:
1247 written = e.characters_written
1248 else:
1249 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(written, 16)
1251 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001253
Ezio Melottib3aedd42010-11-20 19:04:17 +00001254 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255 s = raw.pop_written()
1256 # Previously buffered bytes were flushed
1257 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 def test_write_and_rewind(self):
1260 raw = io.BytesIO()
1261 bufio = self.tp(raw, 4)
1262 self.assertEqual(bufio.write(b"abcdef"), 6)
1263 self.assertEqual(bufio.tell(), 6)
1264 bufio.seek(0, 0)
1265 self.assertEqual(bufio.write(b"XY"), 2)
1266 bufio.seek(6, 0)
1267 self.assertEqual(raw.getvalue(), b"XYcdef")
1268 self.assertEqual(bufio.write(b"123456"), 6)
1269 bufio.flush()
1270 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272 def test_flush(self):
1273 writer = self.MockRawIO()
1274 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001275 bufio.write(b"abc")
1276 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001277 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001278
Antoine Pitrou131a4892012-10-16 22:57:11 +02001279 def test_writelines(self):
1280 l = [b'ab', b'cd', b'ef']
1281 writer = self.MockRawIO()
1282 bufio = self.tp(writer, 8)
1283 bufio.writelines(l)
1284 bufio.flush()
1285 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1286
1287 def test_writelines_userlist(self):
1288 l = UserList([b'ab', b'cd', b'ef'])
1289 writer = self.MockRawIO()
1290 bufio = self.tp(writer, 8)
1291 bufio.writelines(l)
1292 bufio.flush()
1293 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1294
1295 def test_writelines_error(self):
1296 writer = self.MockRawIO()
1297 bufio = self.tp(writer, 8)
1298 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1299 self.assertRaises(TypeError, bufio.writelines, None)
1300 self.assertRaises(TypeError, bufio.writelines, 'abc')
1301
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 def test_destructor(self):
1303 writer = self.MockRawIO()
1304 bufio = self.tp(writer, 8)
1305 bufio.write(b"abc")
1306 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001307 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309
1310 def test_truncate(self):
1311 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001312 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 bufio = self.tp(raw, 8)
1314 bufio.write(b"abcdef")
1315 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001316 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001317 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 self.assertEqual(f.read(), b"abc")
1319
Victor Stinner45df8202010-04-28 22:31:17 +00001320 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001321 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001323 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001324 # Write out many bytes from many threads and test they were
1325 # all flushed.
1326 N = 1000
1327 contents = bytes(range(256)) * N
1328 sizes = cycle([1, 19])
1329 n = 0
1330 queue = deque()
1331 while n < len(contents):
1332 size = next(sizes)
1333 queue.append(contents[n:n+size])
1334 n += size
1335 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001336 # We use a real file object because it allows us to
1337 # exercise situations where the GIL is released before
1338 # writing the buffer to the raw streams. This is in addition
1339 # to concurrency issues due to switching threads in the middle
1340 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001341 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001343 errors = []
1344 def f():
1345 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346 while True:
1347 try:
1348 s = queue.popleft()
1349 except IndexError:
1350 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001351 bufio.write(s)
1352 except Exception as e:
1353 errors.append(e)
1354 raise
1355 threads = [threading.Thread(target=f) for x in range(20)]
1356 for t in threads:
1357 t.start()
1358 time.sleep(0.02) # yield
1359 for t in threads:
1360 t.join()
1361 self.assertFalse(errors,
1362 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001364 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 s = f.read()
1366 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001368 finally:
1369 support.unlink(support.TESTFN)
1370
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 def test_misbehaved_io(self):
1372 rawio = self.MisbehavedRawIO()
1373 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001374 self.assertRaises(OSError, bufio.seek, 0)
1375 self.assertRaises(OSError, bufio.tell)
1376 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377
Florent Xicluna109d5732012-07-07 17:03:22 +02001378 def test_max_buffer_size_removal(self):
1379 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001380 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001381
Benjamin Peterson68623612012-12-20 11:53:11 -06001382 def test_write_error_on_close(self):
1383 raw = self.MockRawIO()
1384 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001385 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001386 raw.write = bad_write
1387 b = self.tp(raw)
1388 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001389 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001390 self.assertTrue(b.closed)
1391
Benjamin Peterson59406a92009-03-26 17:10:29 +00001392
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001393class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 tp = io.BufferedWriter
1395
1396 def test_constructor(self):
1397 BufferedWriterTest.test_constructor(self)
1398 # The allocation can succeed on 32-bit builds, e.g. with more
1399 # than 2GB RAM and a 64-bit kernel.
1400 if sys.maxsize > 0x7FFFFFFF:
1401 rawio = self.MockRawIO()
1402 bufio = self.tp(rawio)
1403 self.assertRaises((OverflowError, MemoryError, ValueError),
1404 bufio.__init__, rawio, sys.maxsize)
1405
1406 def test_initialization(self):
1407 rawio = self.MockRawIO()
1408 bufio = self.tp(rawio)
1409 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1410 self.assertRaises(ValueError, bufio.write, b"def")
1411 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1412 self.assertRaises(ValueError, bufio.write, b"def")
1413 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1414 self.assertRaises(ValueError, bufio.write, b"def")
1415
1416 def test_garbage_collection(self):
1417 # C BufferedWriter objects are collected, and collecting them flushes
1418 # all data to disk.
1419 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001420 with support.check_warnings(('', ResourceWarning)):
1421 rawio = self.FileIO(support.TESTFN, "w+b")
1422 f = self.tp(rawio)
1423 f.write(b"123xxx")
1424 f.x = f
1425 wr = weakref.ref(f)
1426 del f
1427 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001428 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001429 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 self.assertEqual(f.read(), b"123xxx")
1431
R David Murray67bfe802013-02-23 21:51:05 -05001432 def test_args_error(self):
1433 # Issue #17275
1434 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1435 self.tp(io.BytesIO(), 1024, 1024, 1024)
1436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437
1438class PyBufferedWriterTest(BufferedWriterTest):
1439 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001440
Guido van Rossum01a27522007-03-07 01:00:12 +00001441class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001442
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001443 def test_constructor(self):
1444 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001445 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001446
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001447 def test_uninitialized(self):
1448 pair = self.tp.__new__(self.tp)
1449 del pair
1450 pair = self.tp.__new__(self.tp)
1451 self.assertRaisesRegex((ValueError, AttributeError),
1452 'uninitialized|has no attribute',
1453 pair.read, 0)
1454 self.assertRaisesRegex((ValueError, AttributeError),
1455 'uninitialized|has no attribute',
1456 pair.write, b'')
1457 pair.__init__(self.MockRawIO(), self.MockRawIO())
1458 self.assertEqual(pair.read(0), b'')
1459 self.assertEqual(pair.write(b''), 0)
1460
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001461 def test_detach(self):
1462 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1463 self.assertRaises(self.UnsupportedOperation, pair.detach)
1464
Florent Xicluna109d5732012-07-07 17:03:22 +02001465 def test_constructor_max_buffer_size_removal(self):
1466 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001467 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001468
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001469 def test_constructor_with_not_readable(self):
1470 class NotReadable(MockRawIO):
1471 def readable(self):
1472 return False
1473
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001474 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001475
1476 def test_constructor_with_not_writeable(self):
1477 class NotWriteable(MockRawIO):
1478 def writable(self):
1479 return False
1480
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001481 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001482
1483 def test_read(self):
1484 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1485
1486 self.assertEqual(pair.read(3), b"abc")
1487 self.assertEqual(pair.read(1), b"d")
1488 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001489 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1490 self.assertEqual(pair.read(None), b"abc")
1491
1492 def test_readlines(self):
1493 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1494 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1495 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1496 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001497
1498 def test_read1(self):
1499 # .read1() is delegated to the underlying reader object, so this test
1500 # can be shallow.
1501 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1502
1503 self.assertEqual(pair.read1(3), b"abc")
1504
1505 def test_readinto(self):
1506 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1507
1508 data = bytearray(5)
1509 self.assertEqual(pair.readinto(data), 5)
1510 self.assertEqual(data, b"abcde")
1511
1512 def test_write(self):
1513 w = self.MockRawIO()
1514 pair = self.tp(self.MockRawIO(), w)
1515
1516 pair.write(b"abc")
1517 pair.flush()
1518 pair.write(b"def")
1519 pair.flush()
1520 self.assertEqual(w._write_stack, [b"abc", b"def"])
1521
1522 def test_peek(self):
1523 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1524
1525 self.assertTrue(pair.peek(3).startswith(b"abc"))
1526 self.assertEqual(pair.read(3), b"abc")
1527
1528 def test_readable(self):
1529 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1530 self.assertTrue(pair.readable())
1531
1532 def test_writeable(self):
1533 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1534 self.assertTrue(pair.writable())
1535
1536 def test_seekable(self):
1537 # BufferedRWPairs are never seekable, even if their readers and writers
1538 # are.
1539 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1540 self.assertFalse(pair.seekable())
1541
1542 # .flush() is delegated to the underlying writer object and has been
1543 # tested in the test_write method.
1544
1545 def test_close_and_closed(self):
1546 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1547 self.assertFalse(pair.closed)
1548 pair.close()
1549 self.assertTrue(pair.closed)
1550
1551 def test_isatty(self):
1552 class SelectableIsAtty(MockRawIO):
1553 def __init__(self, isatty):
1554 MockRawIO.__init__(self)
1555 self._isatty = isatty
1556
1557 def isatty(self):
1558 return self._isatty
1559
1560 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1561 self.assertFalse(pair.isatty())
1562
1563 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1564 self.assertTrue(pair.isatty())
1565
1566 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1567 self.assertTrue(pair.isatty())
1568
1569 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1570 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001571
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001572 def test_weakref_clearing(self):
1573 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1574 ref = weakref.ref(brw)
1575 brw = None
1576 ref = None # Shouldn't segfault.
1577
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578class CBufferedRWPairTest(BufferedRWPairTest):
1579 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581class PyBufferedRWPairTest(BufferedRWPairTest):
1582 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584
1585class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1586 read_mode = "rb+"
1587 write_mode = "wb+"
1588
1589 def test_constructor(self):
1590 BufferedReaderTest.test_constructor(self)
1591 BufferedWriterTest.test_constructor(self)
1592
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001593 def test_uninitialized(self):
1594 BufferedReaderTest.test_uninitialized(self)
1595 BufferedWriterTest.test_uninitialized(self)
1596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 def test_read_and_write(self):
1598 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001599 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001600
1601 self.assertEqual(b"as", rw.read(2))
1602 rw.write(b"ddd")
1603 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001604 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001606 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001607
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001608 def test_seek_and_tell(self):
1609 raw = self.BytesIO(b"asdfghjkl")
1610 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001611
Ezio Melottib3aedd42010-11-20 19:04:17 +00001612 self.assertEqual(b"as", rw.read(2))
1613 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001614 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001615 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001616
Antoine Pitroue05565e2011-08-20 14:39:23 +02001617 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001618 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001619 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001620 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001621 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001622 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001623 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001624 self.assertEqual(7, rw.tell())
1625 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001626 rw.flush()
1627 self.assertEqual(b"asdf123fl", raw.getvalue())
1628
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001629 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 def check_flush_and_read(self, read_func):
1632 raw = self.BytesIO(b"abcdefghi")
1633 bufio = self.tp(raw)
1634
Ezio Melottib3aedd42010-11-20 19:04:17 +00001635 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001637 self.assertEqual(b"ef", read_func(bufio, 2))
1638 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001640 self.assertEqual(6, bufio.tell())
1641 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 raw.seek(0, 0)
1643 raw.write(b"XYZ")
1644 # flush() resets the read buffer
1645 bufio.flush()
1646 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001647 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648
1649 def test_flush_and_read(self):
1650 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1651
1652 def test_flush_and_readinto(self):
1653 def _readinto(bufio, n=-1):
1654 b = bytearray(n if n >= 0 else 9999)
1655 n = bufio.readinto(b)
1656 return bytes(b[:n])
1657 self.check_flush_and_read(_readinto)
1658
1659 def test_flush_and_peek(self):
1660 def _peek(bufio, n=-1):
1661 # This relies on the fact that the buffer can contain the whole
1662 # raw stream, otherwise peek() can return less.
1663 b = bufio.peek(n)
1664 if n != -1:
1665 b = b[:n]
1666 bufio.seek(len(b), 1)
1667 return b
1668 self.check_flush_and_read(_peek)
1669
1670 def test_flush_and_write(self):
1671 raw = self.BytesIO(b"abcdefghi")
1672 bufio = self.tp(raw)
1673
1674 bufio.write(b"123")
1675 bufio.flush()
1676 bufio.write(b"45")
1677 bufio.flush()
1678 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001679 self.assertEqual(b"12345fghi", raw.getvalue())
1680 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681
1682 def test_threads(self):
1683 BufferedReaderTest.test_threads(self)
1684 BufferedWriterTest.test_threads(self)
1685
1686 def test_writes_and_peek(self):
1687 def _peek(bufio):
1688 bufio.peek(1)
1689 self.check_writes(_peek)
1690 def _peek(bufio):
1691 pos = bufio.tell()
1692 bufio.seek(-1, 1)
1693 bufio.peek(1)
1694 bufio.seek(pos, 0)
1695 self.check_writes(_peek)
1696
1697 def test_writes_and_reads(self):
1698 def _read(bufio):
1699 bufio.seek(-1, 1)
1700 bufio.read(1)
1701 self.check_writes(_read)
1702
1703 def test_writes_and_read1s(self):
1704 def _read1(bufio):
1705 bufio.seek(-1, 1)
1706 bufio.read1(1)
1707 self.check_writes(_read1)
1708
1709 def test_writes_and_readintos(self):
1710 def _read(bufio):
1711 bufio.seek(-1, 1)
1712 bufio.readinto(bytearray(1))
1713 self.check_writes(_read)
1714
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001715 def test_write_after_readahead(self):
1716 # Issue #6629: writing after the buffer was filled by readahead should
1717 # first rewind the raw stream.
1718 for overwrite_size in [1, 5]:
1719 raw = self.BytesIO(b"A" * 10)
1720 bufio = self.tp(raw, 4)
1721 # Trigger readahead
1722 self.assertEqual(bufio.read(1), b"A")
1723 self.assertEqual(bufio.tell(), 1)
1724 # Overwriting should rewind the raw stream if it needs so
1725 bufio.write(b"B" * overwrite_size)
1726 self.assertEqual(bufio.tell(), overwrite_size + 1)
1727 # If the write size was smaller than the buffer size, flush() and
1728 # check that rewind happens.
1729 bufio.flush()
1730 self.assertEqual(bufio.tell(), overwrite_size + 1)
1731 s = raw.getvalue()
1732 self.assertEqual(s,
1733 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1734
Antoine Pitrou7c404892011-05-13 00:13:33 +02001735 def test_write_rewind_write(self):
1736 # Various combinations of reading / writing / seeking backwards / writing again
1737 def mutate(bufio, pos1, pos2):
1738 assert pos2 >= pos1
1739 # Fill the buffer
1740 bufio.seek(pos1)
1741 bufio.read(pos2 - pos1)
1742 bufio.write(b'\x02')
1743 # This writes earlier than the previous write, but still inside
1744 # the buffer.
1745 bufio.seek(pos1)
1746 bufio.write(b'\x01')
1747
1748 b = b"\x80\x81\x82\x83\x84"
1749 for i in range(0, len(b)):
1750 for j in range(i, len(b)):
1751 raw = self.BytesIO(b)
1752 bufio = self.tp(raw, 100)
1753 mutate(bufio, i, j)
1754 bufio.flush()
1755 expected = bytearray(b)
1756 expected[j] = 2
1757 expected[i] = 1
1758 self.assertEqual(raw.getvalue(), expected,
1759 "failed result for i=%d, j=%d" % (i, j))
1760
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001761 def test_truncate_after_read_or_write(self):
1762 raw = self.BytesIO(b"A" * 10)
1763 bufio = self.tp(raw, 100)
1764 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1765 self.assertEqual(bufio.truncate(), 2)
1766 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1767 self.assertEqual(bufio.truncate(), 4)
1768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 def test_misbehaved_io(self):
1770 BufferedReaderTest.test_misbehaved_io(self)
1771 BufferedWriterTest.test_misbehaved_io(self)
1772
Antoine Pitroue05565e2011-08-20 14:39:23 +02001773 def test_interleaved_read_write(self):
1774 # Test for issue #12213
1775 with self.BytesIO(b'abcdefgh') as raw:
1776 with self.tp(raw, 100) as f:
1777 f.write(b"1")
1778 self.assertEqual(f.read(1), b'b')
1779 f.write(b'2')
1780 self.assertEqual(f.read1(1), b'd')
1781 f.write(b'3')
1782 buf = bytearray(1)
1783 f.readinto(buf)
1784 self.assertEqual(buf, b'f')
1785 f.write(b'4')
1786 self.assertEqual(f.peek(1), b'h')
1787 f.flush()
1788 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1789
1790 with self.BytesIO(b'abc') as raw:
1791 with self.tp(raw, 100) as f:
1792 self.assertEqual(f.read(1), b'a')
1793 f.write(b"2")
1794 self.assertEqual(f.read(1), b'c')
1795 f.flush()
1796 self.assertEqual(raw.getvalue(), b'a2c')
1797
1798 def test_interleaved_readline_write(self):
1799 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1800 with self.tp(raw) as f:
1801 f.write(b'1')
1802 self.assertEqual(f.readline(), b'b\n')
1803 f.write(b'2')
1804 self.assertEqual(f.readline(), b'def\n')
1805 f.write(b'3')
1806 self.assertEqual(f.readline(), b'\n')
1807 f.flush()
1808 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1809
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001810 # You can't construct a BufferedRandom over a non-seekable stream.
1811 test_unseekable = None
1812
R David Murray67bfe802013-02-23 21:51:05 -05001813
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001814class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 tp = io.BufferedRandom
1816
1817 def test_constructor(self):
1818 BufferedRandomTest.test_constructor(self)
1819 # The allocation can succeed on 32-bit builds, e.g. with more
1820 # than 2GB RAM and a 64-bit kernel.
1821 if sys.maxsize > 0x7FFFFFFF:
1822 rawio = self.MockRawIO()
1823 bufio = self.tp(rawio)
1824 self.assertRaises((OverflowError, MemoryError, ValueError),
1825 bufio.__init__, rawio, sys.maxsize)
1826
1827 def test_garbage_collection(self):
1828 CBufferedReaderTest.test_garbage_collection(self)
1829 CBufferedWriterTest.test_garbage_collection(self)
1830
R David Murray67bfe802013-02-23 21:51:05 -05001831 def test_args_error(self):
1832 # Issue #17275
1833 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1834 self.tp(io.BytesIO(), 1024, 1024, 1024)
1835
1836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837class PyBufferedRandomTest(BufferedRandomTest):
1838 tp = pyio.BufferedRandom
1839
1840
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001841# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1842# properties:
1843# - A single output character can correspond to many bytes of input.
1844# - The number of input bytes to complete the character can be
1845# undetermined until the last input byte is received.
1846# - The number of input bytes can vary depending on previous input.
1847# - A single input byte can correspond to many characters of output.
1848# - The number of output characters can be undetermined until the
1849# last input byte is received.
1850# - The number of output characters can vary depending on previous input.
1851
1852class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1853 """
1854 For testing seek/tell behavior with a stateful, buffering decoder.
1855
1856 Input is a sequence of words. Words may be fixed-length (length set
1857 by input) or variable-length (period-terminated). In variable-length
1858 mode, extra periods are ignored. Possible words are:
1859 - 'i' followed by a number sets the input length, I (maximum 99).
1860 When I is set to 0, words are space-terminated.
1861 - 'o' followed by a number sets the output length, O (maximum 99).
1862 - Any other word is converted into a word followed by a period on
1863 the output. The output word consists of the input word truncated
1864 or padded out with hyphens to make its length equal to O. If O
1865 is 0, the word is output verbatim without truncating or padding.
1866 I and O are initially set to 1. When I changes, any buffered input is
1867 re-scanned according to the new I. EOF also terminates the last word.
1868 """
1869
1870 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001871 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001872 self.reset()
1873
1874 def __repr__(self):
1875 return '<SID %x>' % id(self)
1876
1877 def reset(self):
1878 self.i = 1
1879 self.o = 1
1880 self.buffer = bytearray()
1881
1882 def getstate(self):
1883 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1884 return bytes(self.buffer), i*100 + o
1885
1886 def setstate(self, state):
1887 buffer, io = state
1888 self.buffer = bytearray(buffer)
1889 i, o = divmod(io, 100)
1890 self.i, self.o = i ^ 1, o ^ 1
1891
1892 def decode(self, input, final=False):
1893 output = ''
1894 for b in input:
1895 if self.i == 0: # variable-length, terminated with period
1896 if b == ord('.'):
1897 if self.buffer:
1898 output += self.process_word()
1899 else:
1900 self.buffer.append(b)
1901 else: # fixed-length, terminate after self.i bytes
1902 self.buffer.append(b)
1903 if len(self.buffer) == self.i:
1904 output += self.process_word()
1905 if final and self.buffer: # EOF terminates the last word
1906 output += self.process_word()
1907 return output
1908
1909 def process_word(self):
1910 output = ''
1911 if self.buffer[0] == ord('i'):
1912 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1913 elif self.buffer[0] == ord('o'):
1914 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1915 else:
1916 output = self.buffer.decode('ascii')
1917 if len(output) < self.o:
1918 output += '-'*self.o # pad out with hyphens
1919 if self.o:
1920 output = output[:self.o] # truncate to output length
1921 output += '.'
1922 self.buffer = bytearray()
1923 return output
1924
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001925 codecEnabled = False
1926
1927 @classmethod
1928 def lookupTestDecoder(cls, name):
1929 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001930 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001931 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001932 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001933 incrementalencoder=None,
1934 streamreader=None, streamwriter=None,
1935 incrementaldecoder=cls)
1936
1937# Register the previous decoder for testing.
1938# Disabled by default, tests will enable it.
1939codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1940
1941
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001942class StatefulIncrementalDecoderTest(unittest.TestCase):
1943 """
1944 Make sure the StatefulIncrementalDecoder actually works.
1945 """
1946
1947 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001948 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001949 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001950 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001951 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001952 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001953 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001954 # I=0, O=6 (variable-length input, fixed-length output)
1955 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1956 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001957 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001958 # I=6, O=3 (fixed-length input > fixed-length output)
1959 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1960 # I=0, then 3; O=29, then 15 (with longer output)
1961 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1962 'a----------------------------.' +
1963 'b----------------------------.' +
1964 'cde--------------------------.' +
1965 'abcdefghijabcde.' +
1966 'a.b------------.' +
1967 '.c.------------.' +
1968 'd.e------------.' +
1969 'k--------------.' +
1970 'l--------------.' +
1971 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001972 ]
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001975 # Try a few one-shot test cases.
1976 for input, eof, output in self.test_cases:
1977 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001978 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001979
1980 # Also test an unfinished decode, followed by forcing EOF.
1981 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001982 self.assertEqual(d.decode(b'oiabcd'), '')
1983 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001984
1985class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001986
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001987 def setUp(self):
1988 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1989 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001990 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001991
Guido van Rossumd0712812007-04-11 16:32:43 +00001992 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001993 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 def test_constructor(self):
1996 r = self.BytesIO(b"\xc3\xa9\n\n")
1997 b = self.BufferedReader(r, 1000)
1998 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001999 t.__init__(b, encoding="latin-1", newline="\r\n")
2000 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002001 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002002 t.__init__(b, encoding="utf-8", line_buffering=True)
2003 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002004 self.assertEqual(t.line_buffering, True)
2005 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 self.assertRaises(TypeError, t.__init__, b, newline=42)
2007 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2008
Nick Coghlana9b15242014-02-04 22:11:18 +10002009 def test_non_text_encoding_codecs_are_rejected(self):
2010 # Ensure the constructor complains if passed a codec that isn't
2011 # marked as a text encoding
2012 # http://bugs.python.org/issue20404
2013 r = self.BytesIO()
2014 b = self.BufferedWriter(r)
2015 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2016 self.TextIOWrapper(b, encoding="hex")
2017
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002018 def test_detach(self):
2019 r = self.BytesIO()
2020 b = self.BufferedWriter(r)
2021 t = self.TextIOWrapper(b)
2022 self.assertIs(t.detach(), b)
2023
2024 t = self.TextIOWrapper(b, encoding="ascii")
2025 t.write("howdy")
2026 self.assertFalse(r.getvalue())
2027 t.detach()
2028 self.assertEqual(r.getvalue(), b"howdy")
2029 self.assertRaises(ValueError, t.detach)
2030
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002031 # Operations independent of the detached stream should still work
2032 repr(t)
2033 self.assertEqual(t.encoding, "ascii")
2034 self.assertEqual(t.errors, "strict")
2035 self.assertFalse(t.line_buffering)
2036
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002037 def test_repr(self):
2038 raw = self.BytesIO("hello".encode("utf-8"))
2039 b = self.BufferedReader(raw)
2040 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002041 modname = self.TextIOWrapper.__module__
2042 self.assertEqual(repr(t),
2043 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2044 raw.name = "dummy"
2045 self.assertEqual(repr(t),
2046 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002047 t.mode = "r"
2048 self.assertEqual(repr(t),
2049 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002050 raw.name = b"dummy"
2051 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002052 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002053
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002054 t.buffer.detach()
2055 repr(t) # Should not raise an exception
2056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002057 def test_line_buffering(self):
2058 r = self.BytesIO()
2059 b = self.BufferedWriter(r, 1000)
2060 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002061 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002062 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002063 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002065 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002066 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002067
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002068 def test_default_encoding(self):
2069 old_environ = dict(os.environ)
2070 try:
2071 # try to get a user preferred encoding different than the current
2072 # locale encoding to check that TextIOWrapper() uses the current
2073 # locale encoding and not the user preferred encoding
2074 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2075 if key in os.environ:
2076 del os.environ[key]
2077
2078 current_locale_encoding = locale.getpreferredencoding(False)
2079 b = self.BytesIO()
2080 t = self.TextIOWrapper(b)
2081 self.assertEqual(t.encoding, current_locale_encoding)
2082 finally:
2083 os.environ.clear()
2084 os.environ.update(old_environ)
2085
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002086 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002087 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002088 # Issue 15989
2089 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002090 b = self.BytesIO()
2091 b.fileno = lambda: _testcapi.INT_MAX + 1
2092 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2093 b.fileno = lambda: _testcapi.UINT_MAX + 1
2094 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002096 def test_encoding(self):
2097 # Check the encoding attribute is always set, and valid
2098 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002099 t = self.TextIOWrapper(b, encoding="utf-8")
2100 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002102 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 codecs.lookup(t.encoding)
2104
2105 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002106 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002107 b = self.BytesIO(b"abc\n\xff\n")
2108 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002109 self.assertRaises(UnicodeError, t.read)
2110 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 b = self.BytesIO(b"abc\n\xff\n")
2112 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002113 self.assertRaises(UnicodeError, t.read)
2114 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002115 b = self.BytesIO(b"abc\n\xff\n")
2116 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002117 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002118 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002119 b = self.BytesIO(b"abc\n\xff\n")
2120 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002121 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002124 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 b = self.BytesIO()
2126 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002127 self.assertRaises(UnicodeError, t.write, "\xff")
2128 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 b = self.BytesIO()
2130 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002131 self.assertRaises(UnicodeError, t.write, "\xff")
2132 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 b = self.BytesIO()
2134 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002135 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002136 t.write("abc\xffdef\n")
2137 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002138 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002139 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 b = self.BytesIO()
2141 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002142 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002143 t.write("abc\xffdef\n")
2144 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002148 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2149
2150 tests = [
2151 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002152 [ '', input_lines ],
2153 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2154 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2155 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002156 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002157 encodings = (
2158 'utf-8', 'latin-1',
2159 'utf-16', 'utf-16-le', 'utf-16-be',
2160 'utf-32', 'utf-32-le', 'utf-32-be',
2161 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002162
Guido van Rossum8358db22007-08-18 21:39:55 +00002163 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002164 # character in TextIOWrapper._pending_line.
2165 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002166 # XXX: str.encode() should return bytes
2167 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002168 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002169 for bufsize in range(1, 10):
2170 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2172 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002173 encoding=encoding)
2174 if do_reads:
2175 got_lines = []
2176 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002177 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002178 if c2 == '':
2179 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002180 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002181 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002182 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002183 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002184
2185 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(got_line, exp_line)
2187 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 def test_newlines_input(self):
2190 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002191 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2192 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002193 (None, normalized.decode("ascii").splitlines(keepends=True)),
2194 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2196 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2197 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002198 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 buf = self.BytesIO(testdata)
2200 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002202 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 def test_newlines_output(self):
2206 testdict = {
2207 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2208 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2209 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2210 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2211 }
2212 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2213 for newline, expected in tests:
2214 buf = self.BytesIO()
2215 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2216 txt.write("AAA\nB")
2217 txt.write("BB\nCCC\n")
2218 txt.write("X\rY\r\nZ")
2219 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(buf.closed, False)
2221 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002222
2223 def test_destructor(self):
2224 l = []
2225 base = self.BytesIO
2226 class MyBytesIO(base):
2227 def close(self):
2228 l.append(self.getvalue())
2229 base.close(self)
2230 b = MyBytesIO()
2231 t = self.TextIOWrapper(b, encoding="ascii")
2232 t.write("abc")
2233 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002234 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002235 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236
2237 def test_override_destructor(self):
2238 record = []
2239 class MyTextIO(self.TextIOWrapper):
2240 def __del__(self):
2241 record.append(1)
2242 try:
2243 f = super().__del__
2244 except AttributeError:
2245 pass
2246 else:
2247 f()
2248 def close(self):
2249 record.append(2)
2250 super().close()
2251 def flush(self):
2252 record.append(3)
2253 super().flush()
2254 b = self.BytesIO()
2255 t = MyTextIO(b, encoding="ascii")
2256 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002257 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 self.assertEqual(record, [1, 2, 3])
2259
2260 def test_error_through_destructor(self):
2261 # Test that the exception state is not modified by a destructor,
2262 # even if close() fails.
2263 rawio = self.CloseFailureIO()
2264 def f():
2265 self.TextIOWrapper(rawio).xyzzy
2266 with support.captured_output("stderr") as s:
2267 self.assertRaises(AttributeError, f)
2268 s = s.getvalue().strip()
2269 if s:
2270 # The destructor *may* have printed an unraisable error, check it
2271 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002272 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002273 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002274
Guido van Rossum9b76da62007-04-11 01:09:03 +00002275 # Systematic tests of the text I/O API
2276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002278 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002279 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002281 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002282 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002283 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002285 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(f.tell(), 0)
2287 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002288 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002289 self.assertEqual(f.seek(0), 0)
2290 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002291 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002292 self.assertEqual(f.read(2), "ab")
2293 self.assertEqual(f.read(1), "c")
2294 self.assertEqual(f.read(1), "")
2295 self.assertEqual(f.read(), "")
2296 self.assertEqual(f.tell(), cookie)
2297 self.assertEqual(f.seek(0), 0)
2298 self.assertEqual(f.seek(0, 2), cookie)
2299 self.assertEqual(f.write("def"), 3)
2300 self.assertEqual(f.seek(cookie), cookie)
2301 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002302 if enc.startswith("utf"):
2303 self.multi_line_test(f, enc)
2304 f.close()
2305
2306 def multi_line_test(self, f, enc):
2307 f.seek(0)
2308 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002309 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002310 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002311 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002312 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002313 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002314 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002315 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002316 wlines.append((f.tell(), line))
2317 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002318 f.seek(0)
2319 rlines = []
2320 while True:
2321 pos = f.tell()
2322 line = f.readline()
2323 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002324 break
2325 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002329 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002330 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002331 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002332 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002333 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002334 p2 = f.tell()
2335 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(f.tell(), p0)
2337 self.assertEqual(f.readline(), "\xff\n")
2338 self.assertEqual(f.tell(), p1)
2339 self.assertEqual(f.readline(), "\xff\n")
2340 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002341 f.seek(0)
2342 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002344 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002345 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002346 f.close()
2347
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 def test_seeking(self):
2349 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002350 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002351 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002352 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002353 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002354 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002355 suffix = bytes(u_suffix.encode("utf-8"))
2356 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002357 with self.open(support.TESTFN, "wb") as f:
2358 f.write(line*2)
2359 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2360 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002361 self.assertEqual(s, str(prefix, "ascii"))
2362 self.assertEqual(f.tell(), prefix_size)
2363 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002366 # Regression test for a specific bug
2367 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002368 with self.open(support.TESTFN, "wb") as f:
2369 f.write(data)
2370 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2371 f._CHUNK_SIZE # Just test that it exists
2372 f._CHUNK_SIZE = 2
2373 f.readline()
2374 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 def test_seek_and_tell(self):
2377 #Test seek/tell using the StatefulIncrementalDecoder.
2378 # Make test faster by doing smaller seeks
2379 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002380
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002381 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002382 """Tell/seek to various points within a data stream and ensure
2383 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002385 f.write(data)
2386 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002387 f = self.open(support.TESTFN, encoding='test_decoder')
2388 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002389 decoded = f.read()
2390 f.close()
2391
Neal Norwitze2b07052008-03-18 19:52:05 +00002392 for i in range(min_pos, len(decoded) + 1): # seek positions
2393 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002395 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002396 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002398 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002399 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002400 f.close()
2401
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002402 # Enable the test decoder.
2403 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002404
2405 # Run the tests.
2406 try:
2407 # Try each test case.
2408 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002409 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002410
2411 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002412 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2413 offset = CHUNK_SIZE - len(input)//2
2414 prefix = b'.'*offset
2415 # Don't bother seeking into the prefix (takes too long).
2416 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002417 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002418
2419 # Ensure our test decoder won't interfere with subsequent tests.
2420 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002421 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002424 data = "1234567890"
2425 tests = ("utf-16",
2426 "utf-16-le",
2427 "utf-16-be",
2428 "utf-32",
2429 "utf-32-le",
2430 "utf-32-be")
2431 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 buf = self.BytesIO()
2433 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002434 # Check if the BOM is written only once (see issue1753).
2435 f.write(data)
2436 f.write(data)
2437 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002438 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002439 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002440 self.assertEqual(f.read(), data * 2)
2441 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002442
Benjamin Petersona1b49012009-03-31 23:11:32 +00002443 def test_unreadable(self):
2444 class UnReadable(self.BytesIO):
2445 def readable(self):
2446 return False
2447 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002448 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002449
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 def test_read_one_by_one(self):
2451 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002452 reads = ""
2453 while True:
2454 c = txt.read(1)
2455 if not c:
2456 break
2457 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002460 def test_readlines(self):
2461 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2462 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2463 txt.seek(0)
2464 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2465 txt.seek(0)
2466 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2467
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002468 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002469 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002470 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002471 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002472 reads = ""
2473 while True:
2474 c = txt.read(128)
2475 if not c:
2476 break
2477 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002479
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002480 def test_writelines(self):
2481 l = ['ab', 'cd', 'ef']
2482 buf = self.BytesIO()
2483 txt = self.TextIOWrapper(buf)
2484 txt.writelines(l)
2485 txt.flush()
2486 self.assertEqual(buf.getvalue(), b'abcdef')
2487
2488 def test_writelines_userlist(self):
2489 l = UserList(['ab', 'cd', 'ef'])
2490 buf = self.BytesIO()
2491 txt = self.TextIOWrapper(buf)
2492 txt.writelines(l)
2493 txt.flush()
2494 self.assertEqual(buf.getvalue(), b'abcdef')
2495
2496 def test_writelines_error(self):
2497 txt = self.TextIOWrapper(self.BytesIO())
2498 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2499 self.assertRaises(TypeError, txt.writelines, None)
2500 self.assertRaises(TypeError, txt.writelines, b'abc')
2501
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002502 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002503 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002504
2505 # read one char at a time
2506 reads = ""
2507 while True:
2508 c = txt.read(1)
2509 if not c:
2510 break
2511 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002512 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002513
2514 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002516 txt._CHUNK_SIZE = 4
2517
2518 reads = ""
2519 while True:
2520 c = txt.read(4)
2521 if not c:
2522 break
2523 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002525
2526 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002527 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002528 txt._CHUNK_SIZE = 4
2529
2530 reads = txt.read(4)
2531 reads += txt.read(4)
2532 reads += txt.readline()
2533 reads += txt.readline()
2534 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002535 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002536
2537 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002539 txt._CHUNK_SIZE = 4
2540
2541 reads = txt.read(4)
2542 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002543 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002544
2545 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002547 txt._CHUNK_SIZE = 4
2548
2549 reads = txt.read(4)
2550 pos = txt.tell()
2551 txt.seek(0)
2552 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002554
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002555 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 buffer = self.BytesIO(self.testdata)
2557 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002558
2559 self.assertEqual(buffer.seekable(), txt.seekable())
2560
Antoine Pitroue4501852009-05-14 18:55:55 +00002561 def test_append_bom(self):
2562 # The BOM is not written again when appending to a non-empty file
2563 filename = support.TESTFN
2564 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2565 with self.open(filename, 'w', encoding=charset) as f:
2566 f.write('aaa')
2567 pos = f.tell()
2568 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002570
2571 with self.open(filename, 'a', encoding=charset) as f:
2572 f.write('xxx')
2573 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002575
2576 def test_seek_bom(self):
2577 # Same test, but when seeking manually
2578 filename = support.TESTFN
2579 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2580 with self.open(filename, 'w', encoding=charset) as f:
2581 f.write('aaa')
2582 pos = f.tell()
2583 with self.open(filename, 'r+', encoding=charset) as f:
2584 f.seek(pos)
2585 f.write('zzz')
2586 f.seek(0)
2587 f.write('bbb')
2588 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002589 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002590
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002591 def test_errors_property(self):
2592 with self.open(support.TESTFN, "w") as f:
2593 self.assertEqual(f.errors, "strict")
2594 with self.open(support.TESTFN, "w", errors="replace") as f:
2595 self.assertEqual(f.errors, "replace")
2596
Brett Cannon31f59292011-02-21 19:29:56 +00002597 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002598 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002599 def test_threads_write(self):
2600 # Issue6750: concurrent writes could duplicate data
2601 event = threading.Event()
2602 with self.open(support.TESTFN, "w", buffering=1) as f:
2603 def run(n):
2604 text = "Thread%03d\n" % n
2605 event.wait()
2606 f.write(text)
2607 threads = [threading.Thread(target=lambda n=x: run(n))
2608 for x in range(20)]
2609 for t in threads:
2610 t.start()
2611 time.sleep(0.02)
2612 event.set()
2613 for t in threads:
2614 t.join()
2615 with self.open(support.TESTFN) as f:
2616 content = f.read()
2617 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002619
Antoine Pitrou6be88762010-05-03 16:48:20 +00002620 def test_flush_error_on_close(self):
2621 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2622 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002623 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002624 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002625 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002626 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002627
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002628 def test_close_error_on_close(self):
2629 buffer = self.BytesIO(self.testdata)
2630 def bad_flush():
2631 raise OSError('flush')
2632 def bad_close():
2633 raise OSError('close')
2634 buffer.close = bad_close
2635 txt = self.TextIOWrapper(buffer, encoding="ascii")
2636 txt.flush = bad_flush
2637 with self.assertRaises(OSError) as err: # exception not swallowed
2638 txt.close()
2639 self.assertEqual(err.exception.args, ('close',))
2640 self.assertIsInstance(err.exception.__context__, OSError)
2641 self.assertEqual(err.exception.__context__.args, ('flush',))
2642 self.assertFalse(txt.closed)
2643
2644 def test_nonnormalized_close_error_on_close(self):
2645 # Issue #21677
2646 buffer = self.BytesIO(self.testdata)
2647 def bad_flush():
2648 raise non_existing_flush
2649 def bad_close():
2650 raise non_existing_close
2651 buffer.close = bad_close
2652 txt = self.TextIOWrapper(buffer, encoding="ascii")
2653 txt.flush = bad_flush
2654 with self.assertRaises(NameError) as err: # exception not swallowed
2655 txt.close()
2656 self.assertIn('non_existing_close', str(err.exception))
2657 self.assertIsInstance(err.exception.__context__, NameError)
2658 self.assertIn('non_existing_flush', str(err.exception.__context__))
2659 self.assertFalse(txt.closed)
2660
Antoine Pitrou6be88762010-05-03 16:48:20 +00002661 def test_multi_close(self):
2662 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2663 txt.close()
2664 txt.close()
2665 txt.close()
2666 self.assertRaises(ValueError, txt.flush)
2667
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002668 def test_unseekable(self):
2669 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2670 self.assertRaises(self.UnsupportedOperation, txt.tell)
2671 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2672
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002673 def test_readonly_attributes(self):
2674 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2675 buf = self.BytesIO(self.testdata)
2676 with self.assertRaises(AttributeError):
2677 txt.buffer = buf
2678
Antoine Pitroue96ec682011-07-23 21:46:35 +02002679 def test_rawio(self):
2680 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2681 # that subprocess.Popen() can have the required unbuffered
2682 # semantics with universal_newlines=True.
2683 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2684 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2685 # Reads
2686 self.assertEqual(txt.read(4), 'abcd')
2687 self.assertEqual(txt.readline(), 'efghi\n')
2688 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2689
2690 def test_rawio_write_through(self):
2691 # Issue #12591: with write_through=True, writes don't need a flush
2692 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2693 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2694 write_through=True)
2695 txt.write('1')
2696 txt.write('23\n4')
2697 txt.write('5')
2698 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2699
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002700 def test_bufio_write_through(self):
2701 # Issue #21396: write_through=True doesn't force a flush()
2702 # on the underlying binary buffered object.
2703 flush_called, write_called = [], []
2704 class BufferedWriter(self.BufferedWriter):
2705 def flush(self, *args, **kwargs):
2706 flush_called.append(True)
2707 return super().flush(*args, **kwargs)
2708 def write(self, *args, **kwargs):
2709 write_called.append(True)
2710 return super().write(*args, **kwargs)
2711
2712 rawio = self.BytesIO()
2713 data = b"a"
2714 bufio = BufferedWriter(rawio, len(data)*2)
2715 textio = self.TextIOWrapper(bufio, encoding='ascii',
2716 write_through=True)
2717 # write to the buffered io but don't overflow the buffer
2718 text = data.decode('ascii')
2719 textio.write(text)
2720
2721 # buffer.flush is not called with write_through=True
2722 self.assertFalse(flush_called)
2723 # buffer.write *is* called with write_through=True
2724 self.assertTrue(write_called)
2725 self.assertEqual(rawio.getvalue(), b"") # no flush
2726
2727 write_called = [] # reset
2728 textio.write(text * 10) # total content is larger than bufio buffer
2729 self.assertTrue(write_called)
2730 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2731
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002732 def test_read_nonbytes(self):
2733 # Issue #17106
2734 # Crash when underlying read() returns non-bytes
2735 t = self.TextIOWrapper(self.StringIO('a'))
2736 self.assertRaises(TypeError, t.read, 1)
2737 t = self.TextIOWrapper(self.StringIO('a'))
2738 self.assertRaises(TypeError, t.readline)
2739 t = self.TextIOWrapper(self.StringIO('a'))
2740 self.assertRaises(TypeError, t.read)
2741
2742 def test_illegal_decoder(self):
2743 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002744 # Bypass the early encoding check added in issue 20404
2745 def _make_illegal_wrapper():
2746 quopri = codecs.lookup("quopri")
2747 quopri._is_text_encoding = True
2748 try:
2749 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2750 newline='\n', encoding="quopri")
2751 finally:
2752 quopri._is_text_encoding = False
2753 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002754 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002755 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002756 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002757 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002758 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002759 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002760 self.assertRaises(TypeError, t.read)
2761
Antoine Pitrou712cb732013-12-21 15:51:54 +01002762 def _check_create_at_shutdown(self, **kwargs):
2763 # Issue #20037: creating a TextIOWrapper at shutdown
2764 # shouldn't crash the interpreter.
2765 iomod = self.io.__name__
2766 code = """if 1:
2767 import codecs
2768 import {iomod} as io
2769
2770 # Avoid looking up codecs at shutdown
2771 codecs.lookup('utf-8')
2772
2773 class C:
2774 def __init__(self):
2775 self.buf = io.BytesIO()
2776 def __del__(self):
2777 io.TextIOWrapper(self.buf, **{kwargs})
2778 print("ok")
2779 c = C()
2780 """.format(iomod=iomod, kwargs=kwargs)
2781 return assert_python_ok("-c", code)
2782
2783 def test_create_at_shutdown_without_encoding(self):
2784 rc, out, err = self._check_create_at_shutdown()
2785 if err:
2786 # Can error out with a RuntimeError if the module state
2787 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002788 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002789 else:
2790 self.assertEqual("ok", out.decode().strip())
2791
2792 def test_create_at_shutdown_with_encoding(self):
2793 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2794 errors='strict')
2795 self.assertFalse(err)
2796 self.assertEqual("ok", out.decode().strip())
2797
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002798 def test_issue22849(self):
2799 class F(object):
2800 def readable(self): return True
2801 def writable(self): return True
2802 def seekable(self): return True
2803
2804 for i in range(10):
2805 try:
2806 self.TextIOWrapper(F(), encoding='utf-8')
2807 except Exception:
2808 pass
2809
2810 F.tell = lambda x: 0
2811 t = self.TextIOWrapper(F(), encoding='utf-8')
2812
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002813
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002815 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002816 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817
2818 def test_initialization(self):
2819 r = self.BytesIO(b"\xc3\xa9\n\n")
2820 b = self.BufferedReader(r, 1000)
2821 t = self.TextIOWrapper(b)
2822 self.assertRaises(TypeError, t.__init__, b, newline=42)
2823 self.assertRaises(ValueError, t.read)
2824 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2825 self.assertRaises(ValueError, t.read)
2826
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002827 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2828 self.assertRaises(Exception, repr, t)
2829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002830 def test_garbage_collection(self):
2831 # C TextIOWrapper objects are collected, and collecting them flushes
2832 # all data to disk.
2833 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002834 with support.check_warnings(('', ResourceWarning)):
2835 rawio = io.FileIO(support.TESTFN, "wb")
2836 b = self.BufferedWriter(rawio)
2837 t = self.TextIOWrapper(b, encoding="ascii")
2838 t.write("456def")
2839 t.x = t
2840 wr = weakref.ref(t)
2841 del t
2842 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002843 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002844 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 self.assertEqual(f.read(), b"456def")
2846
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002847 def test_rwpair_cleared_before_textio(self):
2848 # Issue 13070: TextIOWrapper's finalization would crash when called
2849 # after the reference to the underlying BufferedRWPair's writer got
2850 # cleared by the GC.
2851 for i in range(1000):
2852 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2853 t1 = self.TextIOWrapper(b1, encoding="ascii")
2854 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2855 t2 = self.TextIOWrapper(b2, encoding="ascii")
2856 # circular references
2857 t1.buddy = t2
2858 t2.buddy = t1
2859 support.gc_collect()
2860
2861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002863 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002864 #shutdown_error = "LookupError: unknown encoding: ascii"
2865 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002866
2867
2868class IncrementalNewlineDecoderTest(unittest.TestCase):
2869
2870 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002871 # UTF-8 specific tests for a newline decoder
2872 def _check_decode(b, s, **kwargs):
2873 # We exercise getstate() / setstate() as well as decode()
2874 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002875 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002876 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002878
Antoine Pitrou180a3362008-12-14 16:36:46 +00002879 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002880
Antoine Pitrou180a3362008-12-14 16:36:46 +00002881 _check_decode(b'\xe8', "")
2882 _check_decode(b'\xa2', "")
2883 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002884
Antoine Pitrou180a3362008-12-14 16:36:46 +00002885 _check_decode(b'\xe8', "")
2886 _check_decode(b'\xa2', "")
2887 _check_decode(b'\x88', "\u8888")
2888
2889 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002890 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2891
Antoine Pitrou180a3362008-12-14 16:36:46 +00002892 decoder.reset()
2893 _check_decode(b'\n', "\n")
2894 _check_decode(b'\r', "")
2895 _check_decode(b'', "\n", final=True)
2896 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002897
Antoine Pitrou180a3362008-12-14 16:36:46 +00002898 _check_decode(b'\r', "")
2899 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002900
Antoine Pitrou180a3362008-12-14 16:36:46 +00002901 _check_decode(b'\r\r\n', "\n\n")
2902 _check_decode(b'\r', "")
2903 _check_decode(b'\r', "\n")
2904 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002905
Antoine Pitrou180a3362008-12-14 16:36:46 +00002906 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2907 _check_decode(b'\xe8\xa2\x88', "\u8888")
2908 _check_decode(b'\n', "\n")
2909 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2910 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002912 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002913 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002914 if encoding is not None:
2915 encoder = codecs.getincrementalencoder(encoding)()
2916 def _decode_bytewise(s):
2917 # Decode one byte at a time
2918 for b in encoder.encode(s):
2919 result.append(decoder.decode(bytes([b])))
2920 else:
2921 encoder = None
2922 def _decode_bytewise(s):
2923 # Decode one char at a time
2924 for c in s:
2925 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002926 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002927 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002928 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002929 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002931 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002932 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002933 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002934 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002935 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002937 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002938 input = "abc"
2939 if encoder is not None:
2940 encoder.reset()
2941 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002942 self.assertEqual(decoder.decode(input), "abc")
2943 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002944
2945 def test_newline_decoder(self):
2946 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002947 # None meaning the IncrementalNewlineDecoder takes unicode input
2948 # rather than bytes input
2949 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002950 'utf-16', 'utf-16-le', 'utf-16-be',
2951 'utf-32', 'utf-32-le', 'utf-32-be',
2952 )
2953 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002954 decoder = enc and codecs.getincrementaldecoder(enc)()
2955 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2956 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002957 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002958 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2959 self.check_newline_decoding_utf8(decoder)
2960
Antoine Pitrou66913e22009-03-06 23:40:56 +00002961 def test_newline_bytes(self):
2962 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2963 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002964 self.assertEqual(dec.newlines, None)
2965 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2966 self.assertEqual(dec.newlines, None)
2967 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2968 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002969 dec = self.IncrementalNewlineDecoder(None, translate=False)
2970 _check(dec)
2971 dec = self.IncrementalNewlineDecoder(None, translate=True)
2972 _check(dec)
2973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002974class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2975 pass
2976
2977class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2978 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002979
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002980
Guido van Rossum01a27522007-03-07 01:00:12 +00002981# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002982
Guido van Rossum5abbf752007-08-27 17:39:33 +00002983class MiscIOTest(unittest.TestCase):
2984
Barry Warsaw40e82462008-11-20 20:14:50 +00002985 def tearDown(self):
2986 support.unlink(support.TESTFN)
2987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002988 def test___all__(self):
2989 for name in self.io.__all__:
2990 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002991 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002992 if name == "open":
2993 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002994 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002995 self.assertTrue(issubclass(obj, Exception), name)
2996 elif not name.startswith("SEEK_"):
2997 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002998
Barry Warsaw40e82462008-11-20 20:14:50 +00002999 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003000 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003001 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003002 f.close()
3003
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003004 with support.check_warnings(('', DeprecationWarning)):
3005 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003006 self.assertEqual(f.name, support.TESTFN)
3007 self.assertEqual(f.buffer.name, support.TESTFN)
3008 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3009 self.assertEqual(f.mode, "U")
3010 self.assertEqual(f.buffer.mode, "rb")
3011 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003012 f.close()
3013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003014 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003015 self.assertEqual(f.mode, "w+")
3016 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3017 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003019 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003020 self.assertEqual(g.mode, "wb")
3021 self.assertEqual(g.raw.mode, "wb")
3022 self.assertEqual(g.name, f.fileno())
3023 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003024 f.close()
3025 g.close()
3026
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003027 def test_io_after_close(self):
3028 for kwargs in [
3029 {"mode": "w"},
3030 {"mode": "wb"},
3031 {"mode": "w", "buffering": 1},
3032 {"mode": "w", "buffering": 2},
3033 {"mode": "wb", "buffering": 0},
3034 {"mode": "r"},
3035 {"mode": "rb"},
3036 {"mode": "r", "buffering": 1},
3037 {"mode": "r", "buffering": 2},
3038 {"mode": "rb", "buffering": 0},
3039 {"mode": "w+"},
3040 {"mode": "w+b"},
3041 {"mode": "w+", "buffering": 1},
3042 {"mode": "w+", "buffering": 2},
3043 {"mode": "w+b", "buffering": 0},
3044 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003045 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003046 f.close()
3047 self.assertRaises(ValueError, f.flush)
3048 self.assertRaises(ValueError, f.fileno)
3049 self.assertRaises(ValueError, f.isatty)
3050 self.assertRaises(ValueError, f.__iter__)
3051 if hasattr(f, "peek"):
3052 self.assertRaises(ValueError, f.peek, 1)
3053 self.assertRaises(ValueError, f.read)
3054 if hasattr(f, "read1"):
3055 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003056 if hasattr(f, "readall"):
3057 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003058 if hasattr(f, "readinto"):
3059 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3060 self.assertRaises(ValueError, f.readline)
3061 self.assertRaises(ValueError, f.readlines)
3062 self.assertRaises(ValueError, f.seek, 0)
3063 self.assertRaises(ValueError, f.tell)
3064 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065 self.assertRaises(ValueError, f.write,
3066 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003067 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003070 def test_blockingioerror(self):
3071 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003072 class C(str):
3073 pass
3074 c = C("")
3075 b = self.BlockingIOError(1, c)
3076 c.b = b
3077 b.c = c
3078 wr = weakref.ref(c)
3079 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003080 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003081 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003082
3083 def test_abcs(self):
3084 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003085 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3086 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3087 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3088 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003089
3090 def _check_abc_inheritance(self, abcmodule):
3091 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003092 self.assertIsInstance(f, abcmodule.IOBase)
3093 self.assertIsInstance(f, abcmodule.RawIOBase)
3094 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3095 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003096 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003097 self.assertIsInstance(f, abcmodule.IOBase)
3098 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3099 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3100 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003101 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003102 self.assertIsInstance(f, abcmodule.IOBase)
3103 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3104 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3105 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003106
3107 def test_abc_inheritance(self):
3108 # Test implementations inherit from their respective ABCs
3109 self._check_abc_inheritance(self)
3110
3111 def test_abc_inheritance_official(self):
3112 # Test implementations inherit from the official ABCs of the
3113 # baseline "io" module.
3114 self._check_abc_inheritance(io)
3115
Antoine Pitroue033e062010-10-29 10:38:18 +00003116 def _check_warn_on_dealloc(self, *args, **kwargs):
3117 f = open(*args, **kwargs)
3118 r = repr(f)
3119 with self.assertWarns(ResourceWarning) as cm:
3120 f = None
3121 support.gc_collect()
3122 self.assertIn(r, str(cm.warning.args[0]))
3123
3124 def test_warn_on_dealloc(self):
3125 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3126 self._check_warn_on_dealloc(support.TESTFN, "wb")
3127 self._check_warn_on_dealloc(support.TESTFN, "w")
3128
3129 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3130 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003131 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003132 for fd in fds:
3133 try:
3134 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003135 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003136 if e.errno != errno.EBADF:
3137 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003138 self.addCleanup(cleanup_fds)
3139 r, w = os.pipe()
3140 fds += r, w
3141 self._check_warn_on_dealloc(r, *args, **kwargs)
3142 # When using closefd=False, there's no warning
3143 r, w = os.pipe()
3144 fds += r, w
3145 with warnings.catch_warnings(record=True) as recorded:
3146 open(r, *args, closefd=False, **kwargs)
3147 support.gc_collect()
3148 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003149
3150 def test_warn_on_dealloc_fd(self):
3151 self._check_warn_on_dealloc_fd("rb", buffering=0)
3152 self._check_warn_on_dealloc_fd("rb")
3153 self._check_warn_on_dealloc_fd("r")
3154
3155
Antoine Pitrou243757e2010-11-05 21:15:39 +00003156 def test_pickling(self):
3157 # Pickling file objects is forbidden
3158 for kwargs in [
3159 {"mode": "w"},
3160 {"mode": "wb"},
3161 {"mode": "wb", "buffering": 0},
3162 {"mode": "r"},
3163 {"mode": "rb"},
3164 {"mode": "rb", "buffering": 0},
3165 {"mode": "w+"},
3166 {"mode": "w+b"},
3167 {"mode": "w+b", "buffering": 0},
3168 ]:
3169 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3170 with self.open(support.TESTFN, **kwargs) as f:
3171 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3172
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003173 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3174 def test_nonblock_pipe_write_bigbuf(self):
3175 self._test_nonblock_pipe_write(16*1024)
3176
3177 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3178 def test_nonblock_pipe_write_smallbuf(self):
3179 self._test_nonblock_pipe_write(1024)
3180
3181 def _set_non_blocking(self, fd):
3182 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3183 self.assertNotEqual(flags, -1)
3184 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3185 self.assertEqual(res, 0)
3186
3187 def _test_nonblock_pipe_write(self, bufsize):
3188 sent = []
3189 received = []
3190 r, w = os.pipe()
3191 self._set_non_blocking(r)
3192 self._set_non_blocking(w)
3193
3194 # To exercise all code paths in the C implementation we need
3195 # to play with buffer sizes. For instance, if we choose a
3196 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3197 # then we will never get a partial write of the buffer.
3198 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3199 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3200
3201 with rf, wf:
3202 for N in 9999, 73, 7574:
3203 try:
3204 i = 0
3205 while True:
3206 msg = bytes([i % 26 + 97]) * N
3207 sent.append(msg)
3208 wf.write(msg)
3209 i += 1
3210
3211 except self.BlockingIOError as e:
3212 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003213 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003214 sent[-1] = sent[-1][:e.characters_written]
3215 received.append(rf.read())
3216 msg = b'BLOCKED'
3217 wf.write(msg)
3218 sent.append(msg)
3219
3220 while True:
3221 try:
3222 wf.flush()
3223 break
3224 except self.BlockingIOError as e:
3225 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003226 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003227 self.assertEqual(e.characters_written, 0)
3228 received.append(rf.read())
3229
3230 received += iter(rf.read, None)
3231
3232 sent, received = b''.join(sent), b''.join(received)
3233 self.assertTrue(sent == received)
3234 self.assertTrue(wf.closed)
3235 self.assertTrue(rf.closed)
3236
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003237 def test_create_fail(self):
3238 # 'x' mode fails if file is existing
3239 with self.open(support.TESTFN, 'w'):
3240 pass
3241 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3242
3243 def test_create_writes(self):
3244 # 'x' mode opens for writing
3245 with self.open(support.TESTFN, 'xb') as f:
3246 f.write(b"spam")
3247 with self.open(support.TESTFN, 'rb') as f:
3248 self.assertEqual(b"spam", f.read())
3249
Christian Heimes7b648752012-09-10 14:48:43 +02003250 def test_open_allargs(self):
3251 # there used to be a buffer overflow in the parser for rawmode
3252 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3253
3254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003255class CMiscIOTest(MiscIOTest):
3256 io = io
3257
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003258 def test_readinto_buffer_overflow(self):
3259 # Issue #18025
3260 class BadReader(self.io.BufferedIOBase):
3261 def read(self, n=-1):
3262 return b'x' * 10**6
3263 bufio = BadReader()
3264 b = bytearray(2)
3265 self.assertRaises(ValueError, bufio.readinto, b)
3266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003267class PyMiscIOTest(MiscIOTest):
3268 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003269
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003270
3271@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3272class SignalsTest(unittest.TestCase):
3273
3274 def setUp(self):
3275 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3276
3277 def tearDown(self):
3278 signal.signal(signal.SIGALRM, self.oldalrm)
3279
3280 def alarm_interrupt(self, sig, frame):
3281 1/0
3282
3283 @unittest.skipUnless(threading, 'Threading required for this test.')
3284 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3285 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003286 invokes the signal handler, and bubbles up the exception raised
3287 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003288 read_results = []
3289 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003290 if hasattr(signal, 'pthread_sigmask'):
3291 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003292 s = os.read(r, 1)
3293 read_results.append(s)
3294 t = threading.Thread(target=_read)
3295 t.daemon = True
3296 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003297 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003298 try:
3299 wio = self.io.open(w, **fdopen_kwargs)
3300 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003301 # Fill the pipe enough that the write will be blocking.
3302 # It will be interrupted by the timer armed above. Since the
3303 # other thread has read one byte, the low-level write will
3304 # return with a successful (partial) result rather than an EINTR.
3305 # The buffered IO layer must check for pending signal
3306 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003307 signal.alarm(1)
3308 try:
3309 self.assertRaises(ZeroDivisionError,
3310 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3311 finally:
3312 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003313 t.join()
3314 # We got one byte, get another one and check that it isn't a
3315 # repeat of the first one.
3316 read_results.append(os.read(r, 1))
3317 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3318 finally:
3319 os.close(w)
3320 os.close(r)
3321 # This is deliberate. If we didn't close the file descriptor
3322 # before closing wio, wio would try to flush its internal
3323 # buffer, and block again.
3324 try:
3325 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003326 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003327 if e.errno != errno.EBADF:
3328 raise
3329
3330 def test_interrupted_write_unbuffered(self):
3331 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3332
3333 def test_interrupted_write_buffered(self):
3334 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3335
Victor Stinner6ab72862014-09-03 23:32:28 +02003336 # Issue #22331: The test hangs on FreeBSD 7.2
3337 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003338 def test_interrupted_write_text(self):
3339 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3340
Brett Cannon31f59292011-02-21 19:29:56 +00003341 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003342 def check_reentrant_write(self, data, **fdopen_kwargs):
3343 def on_alarm(*args):
3344 # Will be called reentrantly from the same thread
3345 wio.write(data)
3346 1/0
3347 signal.signal(signal.SIGALRM, on_alarm)
3348 r, w = os.pipe()
3349 wio = self.io.open(w, **fdopen_kwargs)
3350 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003351 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003352 # Either the reentrant call to wio.write() fails with RuntimeError,
3353 # or the signal handler raises ZeroDivisionError.
3354 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3355 while 1:
3356 for i in range(100):
3357 wio.write(data)
3358 wio.flush()
3359 # Make sure the buffer doesn't fill up and block further writes
3360 os.read(r, len(data) * 100)
3361 exc = cm.exception
3362 if isinstance(exc, RuntimeError):
3363 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3364 finally:
3365 wio.close()
3366 os.close(r)
3367
3368 def test_reentrant_write_buffered(self):
3369 self.check_reentrant_write(b"xy", mode="wb")
3370
3371 def test_reentrant_write_text(self):
3372 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3373
Antoine Pitrou707ce822011-02-25 21:24:11 +00003374 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3375 """Check that a buffered read, when it gets interrupted (either
3376 returning a partial result or EINTR), properly invokes the signal
3377 handler and retries if the latter returned successfully."""
3378 r, w = os.pipe()
3379 fdopen_kwargs["closefd"] = False
3380 def alarm_handler(sig, frame):
3381 os.write(w, b"bar")
3382 signal.signal(signal.SIGALRM, alarm_handler)
3383 try:
3384 rio = self.io.open(r, **fdopen_kwargs)
3385 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003386 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003387 # Expected behaviour:
3388 # - first raw read() returns partial b"foo"
3389 # - second raw read() returns EINTR
3390 # - third raw read() returns b"bar"
3391 self.assertEqual(decode(rio.read(6)), "foobar")
3392 finally:
3393 rio.close()
3394 os.close(w)
3395 os.close(r)
3396
Antoine Pitrou20db5112011-08-19 20:32:34 +02003397 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003398 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3399 mode="rb")
3400
Antoine Pitrou20db5112011-08-19 20:32:34 +02003401 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003402 self.check_interrupted_read_retry(lambda x: x,
3403 mode="r")
3404
3405 @unittest.skipUnless(threading, 'Threading required for this test.')
3406 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3407 """Check that a buffered write, when it gets interrupted (either
3408 returning a partial result or EINTR), properly invokes the signal
3409 handler and retries if the latter returned successfully."""
3410 select = support.import_module("select")
3411 # A quantity that exceeds the buffer size of an anonymous pipe's
3412 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003413 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003414 r, w = os.pipe()
3415 fdopen_kwargs["closefd"] = False
3416 # We need a separate thread to read from the pipe and allow the
3417 # write() to finish. This thread is started after the SIGALRM is
3418 # received (forcing a first EINTR in write()).
3419 read_results = []
3420 write_finished = False
3421 def _read():
3422 while not write_finished:
3423 while r in select.select([r], [], [], 1.0)[0]:
3424 s = os.read(r, 1024)
3425 read_results.append(s)
3426 t = threading.Thread(target=_read)
3427 t.daemon = True
3428 def alarm1(sig, frame):
3429 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003430 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003431 def alarm2(sig, frame):
3432 t.start()
3433 signal.signal(signal.SIGALRM, alarm1)
3434 try:
3435 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003436 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003437 # Expected behaviour:
3438 # - first raw write() is partial (because of the limited pipe buffer
3439 # and the first alarm)
3440 # - second raw write() returns EINTR (because of the second alarm)
3441 # - subsequent write()s are successful (either partial or complete)
3442 self.assertEqual(N, wio.write(item * N))
3443 wio.flush()
3444 write_finished = True
3445 t.join()
3446 self.assertEqual(N, sum(len(x) for x in read_results))
3447 finally:
3448 write_finished = True
3449 os.close(w)
3450 os.close(r)
3451 # This is deliberate. If we didn't close the file descriptor
3452 # before closing wio, wio would try to flush its internal
3453 # buffer, and could block (in case of failure).
3454 try:
3455 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003456 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003457 if e.errno != errno.EBADF:
3458 raise
3459
Antoine Pitrou20db5112011-08-19 20:32:34 +02003460 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003461 self.check_interrupted_write_retry(b"x", mode="wb")
3462
Antoine Pitrou20db5112011-08-19 20:32:34 +02003463 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003464 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3465
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003466
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003467class CSignalsTest(SignalsTest):
3468 io = io
3469
3470class PySignalsTest(SignalsTest):
3471 io = pyio
3472
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003473 # Handling reentrancy issues would slow down _pyio even more, so the
3474 # tests are disabled.
3475 test_reentrant_write_buffered = None
3476 test_reentrant_write_text = None
3477
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003478
Ezio Melottidaa42c72013-03-23 16:30:16 +02003479def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003480 tests = (CIOTest, PyIOTest,
3481 CBufferedReaderTest, PyBufferedReaderTest,
3482 CBufferedWriterTest, PyBufferedWriterTest,
3483 CBufferedRWPairTest, PyBufferedRWPairTest,
3484 CBufferedRandomTest, PyBufferedRandomTest,
3485 StatefulIncrementalDecoderTest,
3486 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3487 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003488 CMiscIOTest, PyMiscIOTest,
3489 CSignalsTest, PySignalsTest,
3490 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003491
3492 # Put the namespaces of the IO module we are testing and some useful mock
3493 # classes in the __dict__ of each test.
3494 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003495 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003496 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3497 c_io_ns = {name : getattr(io, name) for name in all_members}
3498 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3499 globs = globals()
3500 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3501 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3502 # Avoid turning open into a bound method.
3503 py_io_ns["open"] = pyio.OpenWrapper
3504 for test in tests:
3505 if test.__name__.startswith("C"):
3506 for name, obj in c_io_ns.items():
3507 setattr(test, name, obj)
3508 elif test.__name__.startswith("Py"):
3509 for name, obj in py_io_ns.items():
3510 setattr(test, name, obj)
3511
Ezio Melottidaa42c72013-03-23 16:30:16 +02003512 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3513 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003514
3515if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003516 unittest.main()