blob: 8e702db194d4702c0f5045a7a8eecddbaef643cb [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200658
659 def test_IOBase_finalize(self):
660 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
661 # class which inherits IOBase and an object of this class are caught
662 # in a reference cycle and close() is already in the method cache.
663 class MyIO(self.IOBase):
664 def close(self):
665 pass
666
667 # create an instance to populate the method cache
668 MyIO()
669 obj = MyIO()
670 obj.obj = obj
671 wr = weakref.ref(obj)
672 del MyIO
673 del obj
674 support.gc_collect()
675 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677class PyIOTest(IOTest):
678 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000679
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class CommonBufferedTests:
682 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
683
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000684 def test_detach(self):
685 raw = self.MockRawIO()
686 buf = self.tp(raw)
687 self.assertIs(buf.detach(), raw)
688 self.assertRaises(ValueError, buf.detach)
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melottib3aedd42010-11-20 19:04:17 +0000694 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695
Zachary Ware9fe6d862013-12-08 00:20:35 -0600696 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200707 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super().__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super().close()
724 def flush(self):
725 record.append(3)
726 super().flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000731 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000762 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000763
Antoine Pitrou716c4442009-05-23 19:04:03 +0000764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
773
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774 def test_flush_error_on_close(self):
775 raw = self.MockRawIO()
776 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200777 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 raw.flush = bad_flush
779 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200780 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600781 self.assertTrue(b.closed)
782
783 def test_close_error_on_close(self):
784 raw = self.MockRawIO()
785 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200786 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600787 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600789 raw.close = bad_close
790 b = self.tp(raw)
791 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 b.close()
794 self.assertEqual(err.exception.args, ('close',))
795 self.assertEqual(err.exception.__context__.args, ('flush',))
796 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797
798 def test_multi_close(self):
799 raw = self.MockRawIO()
800 b = self.tp(raw)
801 b.close()
802 b.close()
803 b.close()
804 self.assertRaises(ValueError, b.flush)
805
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000806 def test_unseekable(self):
807 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
808 self.assertRaises(self.UnsupportedOperation, bufio.tell)
809 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
810
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000811 def test_readonly_attributes(self):
812 raw = self.MockRawIO()
813 buf = self.tp(raw)
814 x = self.MockRawIO()
815 with self.assertRaises(AttributeError):
816 buf.raw = x
817
Guido van Rossum78892e42007-04-06 17:31:18 +0000818
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200819class SizeofTest:
820
821 @support.cpython_only
822 def test_sizeof(self):
823 bufsize1 = 4096
824 bufsize2 = 8192
825 rawio = self.MockRawIO()
826 bufio = self.tp(rawio, buffer_size=bufsize1)
827 size = sys.getsizeof(bufio) - bufsize1
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize2)
830 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
831
Jesus Ceadc469452012-10-04 12:37:56 +0200832 @support.cpython_only
833 def test_buffer_freeing(self) :
834 bufsize = 4096
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio, buffer_size=bufsize)
837 size = sys.getsizeof(bufio) - bufsize
838 bufio.close()
839 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
842 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_constructor(self):
845 rawio = self.MockRawIO([b"abc"])
846 bufio = self.tp(rawio)
847 bufio.__init__(rawio)
848 bufio.__init__(rawio, buffer_size=1024)
849 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000850 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
854 rawio = self.MockRawIO([b"abc"])
855 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000857
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200858 def test_uninitialized(self):
859 bufio = self.tp.__new__(self.tp)
860 del bufio
861 bufio = self.tp.__new__(self.tp)
862 self.assertRaisesRegex((ValueError, AttributeError),
863 'uninitialized|has no attribute',
864 bufio.read, 0)
865 bufio.__init__(self.MockRawIO())
866 self.assertEqual(bufio.read(0), b'')
867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000869 for arg in (None, 7):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000872 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 # Invalid args
874 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_read1(self):
877 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
878 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"a", bufio.read(1))
880 self.assertEqual(b"b", bufio.read1(1))
881 self.assertEqual(rawio._reads, 1)
882 self.assertEqual(b"c", bufio.read1(100))
883 self.assertEqual(rawio._reads, 1)
884 self.assertEqual(b"d", bufio.read1(100))
885 self.assertEqual(rawio._reads, 2)
886 self.assertEqual(b"efg", bufio.read1(100))
887 self.assertEqual(rawio._reads, 3)
888 self.assertEqual(b"", bufio.read1(100))
889 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 # Invalid args
891 self.assertRaises(ValueError, bufio.read1, -1)
892
893 def test_readinto(self):
894 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
895 bufio = self.tp(rawio)
896 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000897 self.assertEqual(bufio.readinto(b), 2)
898 self.assertEqual(b, b"ab")
899 self.assertEqual(bufio.readinto(b), 2)
900 self.assertEqual(b, b"cd")
901 self.assertEqual(bufio.readinto(b), 2)
902 self.assertEqual(b, b"ef")
903 self.assertEqual(bufio.readinto(b), 1)
904 self.assertEqual(b, b"gf")
905 self.assertEqual(bufio.readinto(b), 0)
906 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200907 rawio = self.MockRawIO((b"abc", None))
908 bufio = self.tp(rawio)
909 self.assertEqual(bufio.readinto(b), 2)
910 self.assertEqual(b, b"ab")
911 self.assertEqual(bufio.readinto(b), 1)
912 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000914 def test_readlines(self):
915 def bufio():
916 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
917 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000918 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
919 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
920 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000923 data = b"abcdefghi"
924 dlen = len(data)
925
926 tests = [
927 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
928 [ 100, [ 3, 3, 3], [ dlen ] ],
929 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
930 ]
931
932 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 rawio = self.MockFileIO(data)
934 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000935 pos = 0
936 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000938 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000940 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000943 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000944 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
945 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcd", bufio.read(6))
947 self.assertEqual(b"e", bufio.read(1))
948 self.assertEqual(b"fg", bufio.read())
949 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200950 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000951 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000952
Victor Stinnera80987f2011-05-25 22:47:16 +0200953 rawio = self.MockRawIO((b"a", None, None))
954 self.assertEqual(b"a", rawio.readall())
955 self.assertIsNone(rawio.readall())
956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read_past_eof(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000963 def test_read_all(self):
964 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
965 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000966
Ezio Melottib3aedd42010-11-20 19:04:17 +0000967 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000968
Victor Stinner45df8202010-04-28 22:31:17 +0000969 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000970 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 try:
973 # Write out many bytes with exactly the same number of 0's,
974 # 1's... 255's. This will help us check that concurrent reading
975 # doesn't duplicate or forget contents.
976 N = 1000
977 l = list(range(256)) * N
978 random.shuffle(l)
979 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000980 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000981 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000982 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000984 errors = []
985 results = []
986 def f():
987 try:
988 # Intra-buffer read then buffer-flushing read
989 for n in cycle([1, 19]):
990 s = bufio.read(n)
991 if not s:
992 break
993 # list.append() is atomic
994 results.append(s)
995 except Exception as e:
996 errors.append(e)
997 raise
998 threads = [threading.Thread(target=f) for x in range(20)]
999 for t in threads:
1000 t.start()
1001 time.sleep(0.02) # yield
1002 for t in threads:
1003 t.join()
1004 self.assertFalse(errors,
1005 "the following exceptions were caught: %r" % errors)
1006 s = b''.join(results)
1007 for i in range(256):
1008 c = bytes(bytearray([i]))
1009 self.assertEqual(s.count(c), N)
1010 finally:
1011 support.unlink(support.TESTFN)
1012
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001013 def test_unseekable(self):
1014 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1015 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1016 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1017 bufio.read(1)
1018 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1019 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 def test_misbehaved_io(self):
1022 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1023 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001024 self.assertRaises(OSError, bufio.seek, 0)
1025 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001027 def test_no_extraneous_read(self):
1028 # Issue #9550; when the raw IO object has satisfied the read request,
1029 # we should not issue any additional reads, otherwise it may block
1030 # (e.g. socket).
1031 bufsize = 16
1032 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1033 rawio = self.MockRawIO([b"x" * n])
1034 bufio = self.tp(rawio, bufsize)
1035 self.assertEqual(bufio.read(n), b"x" * n)
1036 # Simple case: one raw read is enough to satisfy the request.
1037 self.assertEqual(rawio._extraneous_reads, 0,
1038 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1039 # A more complex case where two raw reads are needed to satisfy
1040 # the request.
1041 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1042 bufio = self.tp(rawio, bufsize)
1043 self.assertEqual(bufio.read(n), b"x" * n)
1044 self.assertEqual(rawio._extraneous_reads, 0,
1045 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1046
1047
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001048class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 tp = io.BufferedReader
1050
1051 def test_constructor(self):
1052 BufferedReaderTest.test_constructor(self)
1053 # The allocation can succeed on 32-bit builds, e.g. with more
1054 # than 2GB RAM and a 64-bit kernel.
1055 if sys.maxsize > 0x7FFFFFFF:
1056 rawio = self.MockRawIO()
1057 bufio = self.tp(rawio)
1058 self.assertRaises((OverflowError, MemoryError, ValueError),
1059 bufio.__init__, rawio, sys.maxsize)
1060
1061 def test_initialization(self):
1062 rawio = self.MockRawIO([b"abc"])
1063 bufio = self.tp(rawio)
1064 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1065 self.assertRaises(ValueError, bufio.read)
1066 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1067 self.assertRaises(ValueError, bufio.read)
1068 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1069 self.assertRaises(ValueError, bufio.read)
1070
1071 def test_misbehaved_io_read(self):
1072 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1073 bufio = self.tp(rawio)
1074 # _pyio.BufferedReader seems to implement reading different, so that
1075 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001076 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077
1078 def test_garbage_collection(self):
1079 # C BufferedReader objects are collected.
1080 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001081 with support.check_warnings(('', ResourceWarning)):
1082 rawio = self.FileIO(support.TESTFN, "w+b")
1083 f = self.tp(rawio)
1084 f.f = f
1085 wr = weakref.ref(f)
1086 del f
1087 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089
R David Murray67bfe802013-02-23 21:51:05 -05001090 def test_args_error(self):
1091 # Issue #17275
1092 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1093 self.tp(io.BytesIO(), 1024, 1024, 1024)
1094
1095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096class PyBufferedReaderTest(BufferedReaderTest):
1097 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001098
Guido van Rossuma9e20242007-03-08 00:43:48 +00001099
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1101 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_constructor(self):
1104 rawio = self.MockRawIO()
1105 bufio = self.tp(rawio)
1106 bufio.__init__(rawio)
1107 bufio.__init__(rawio, buffer_size=1024)
1108 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 bufio.flush()
1111 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1112 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1113 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1114 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001115 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001119 def test_uninitialized(self):
1120 bufio = self.tp.__new__(self.tp)
1121 del bufio
1122 bufio = self.tp.__new__(self.tp)
1123 self.assertRaisesRegex((ValueError, AttributeError),
1124 'uninitialized|has no attribute',
1125 bufio.write, b'')
1126 bufio.__init__(self.MockRawIO())
1127 self.assertEqual(bufio.write(b''), 0)
1128
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001129 def test_detach_flush(self):
1130 raw = self.MockRawIO()
1131 buf = self.tp(raw)
1132 buf.write(b"howdy!")
1133 self.assertFalse(raw._write_stack)
1134 buf.detach()
1135 self.assertEqual(raw._write_stack, [b"howdy!"])
1136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001138 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001141 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001142 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 def test_write_overflow(self):
1145 writer = self.MockRawIO()
1146 bufio = self.tp(writer, 8)
1147 contents = b"abcdefghijklmnop"
1148 for n in range(0, len(contents), 3):
1149 bufio.write(contents[n:n+3])
1150 flushed = b"".join(writer._write_stack)
1151 # At least (total - 8) bytes were implicitly flushed, perhaps more
1152 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001153 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 def check_writes(self, intermediate_func):
1156 # Lots of writes, test the flushed output is as expected.
1157 contents = bytes(range(256)) * 1000
1158 n = 0
1159 writer = self.MockRawIO()
1160 bufio = self.tp(writer, 13)
1161 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1162 def gen_sizes():
1163 for size in count(1):
1164 for i in range(15):
1165 yield size
1166 sizes = gen_sizes()
1167 while n < len(contents):
1168 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001169 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 intermediate_func(bufio)
1171 n += size
1172 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001173 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_writes(self):
1176 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 def test_writes_and_flushes(self):
1179 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 def test_writes_and_seeks(self):
1182 def _seekabs(bufio):
1183 pos = bufio.tell()
1184 bufio.seek(pos + 1, 0)
1185 bufio.seek(pos - 1, 0)
1186 bufio.seek(pos, 0)
1187 self.check_writes(_seekabs)
1188 def _seekrel(bufio):
1189 pos = bufio.seek(0, 1)
1190 bufio.seek(+1, 1)
1191 bufio.seek(-1, 1)
1192 bufio.seek(pos, 0)
1193 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 def test_writes_and_truncates(self):
1196 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 def test_write_non_blocking(self):
1199 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001200 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001201
Ezio Melottib3aedd42010-11-20 19:04:17 +00001202 self.assertEqual(bufio.write(b"abcd"), 4)
1203 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 # 1 byte will be written, the rest will be buffered
1205 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1209 raw.block_on(b"0")
1210 try:
1211 bufio.write(b"opqrwxyz0123456789")
1212 except self.BlockingIOError as e:
1213 written = e.characters_written
1214 else:
1215 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001216 self.assertEqual(written, 16)
1217 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001219
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 s = raw.pop_written()
1222 # Previously buffered bytes were flushed
1223 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 def test_write_and_rewind(self):
1226 raw = io.BytesIO()
1227 bufio = self.tp(raw, 4)
1228 self.assertEqual(bufio.write(b"abcdef"), 6)
1229 self.assertEqual(bufio.tell(), 6)
1230 bufio.seek(0, 0)
1231 self.assertEqual(bufio.write(b"XY"), 2)
1232 bufio.seek(6, 0)
1233 self.assertEqual(raw.getvalue(), b"XYcdef")
1234 self.assertEqual(bufio.write(b"123456"), 6)
1235 bufio.flush()
1236 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238 def test_flush(self):
1239 writer = self.MockRawIO()
1240 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001241 bufio.write(b"abc")
1242 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001243 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001244
Antoine Pitrou131a4892012-10-16 22:57:11 +02001245 def test_writelines(self):
1246 l = [b'ab', b'cd', b'ef']
1247 writer = self.MockRawIO()
1248 bufio = self.tp(writer, 8)
1249 bufio.writelines(l)
1250 bufio.flush()
1251 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1252
1253 def test_writelines_userlist(self):
1254 l = UserList([b'ab', b'cd', b'ef'])
1255 writer = self.MockRawIO()
1256 bufio = self.tp(writer, 8)
1257 bufio.writelines(l)
1258 bufio.flush()
1259 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1260
1261 def test_writelines_error(self):
1262 writer = self.MockRawIO()
1263 bufio = self.tp(writer, 8)
1264 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1265 self.assertRaises(TypeError, bufio.writelines, None)
1266 self.assertRaises(TypeError, bufio.writelines, 'abc')
1267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_destructor(self):
1269 writer = self.MockRawIO()
1270 bufio = self.tp(writer, 8)
1271 bufio.write(b"abc")
1272 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001273 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275
1276 def test_truncate(self):
1277 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001278 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 bufio = self.tp(raw, 8)
1280 bufio.write(b"abcdef")
1281 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001282 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001283 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 self.assertEqual(f.read(), b"abc")
1285
Victor Stinner45df8202010-04-28 22:31:17 +00001286 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001287 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001289 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 # Write out many bytes from many threads and test they were
1291 # all flushed.
1292 N = 1000
1293 contents = bytes(range(256)) * N
1294 sizes = cycle([1, 19])
1295 n = 0
1296 queue = deque()
1297 while n < len(contents):
1298 size = next(sizes)
1299 queue.append(contents[n:n+size])
1300 n += size
1301 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001302 # We use a real file object because it allows us to
1303 # exercise situations where the GIL is released before
1304 # writing the buffer to the raw streams. This is in addition
1305 # to concurrency issues due to switching threads in the middle
1306 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001307 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001309 errors = []
1310 def f():
1311 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 while True:
1313 try:
1314 s = queue.popleft()
1315 except IndexError:
1316 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001317 bufio.write(s)
1318 except Exception as e:
1319 errors.append(e)
1320 raise
1321 threads = [threading.Thread(target=f) for x in range(20)]
1322 for t in threads:
1323 t.start()
1324 time.sleep(0.02) # yield
1325 for t in threads:
1326 t.join()
1327 self.assertFalse(errors,
1328 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001330 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 s = f.read()
1332 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001333 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001334 finally:
1335 support.unlink(support.TESTFN)
1336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337 def test_misbehaved_io(self):
1338 rawio = self.MisbehavedRawIO()
1339 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001340 self.assertRaises(OSError, bufio.seek, 0)
1341 self.assertRaises(OSError, bufio.tell)
1342 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343
Florent Xicluna109d5732012-07-07 17:03:22 +02001344 def test_max_buffer_size_removal(self):
1345 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001346 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001347
Benjamin Peterson68623612012-12-20 11:53:11 -06001348 def test_write_error_on_close(self):
1349 raw = self.MockRawIO()
1350 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001351 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001352 raw.write = bad_write
1353 b = self.tp(raw)
1354 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001355 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001356 self.assertTrue(b.closed)
1357
Benjamin Peterson59406a92009-03-26 17:10:29 +00001358
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001359class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 tp = io.BufferedWriter
1361
1362 def test_constructor(self):
1363 BufferedWriterTest.test_constructor(self)
1364 # The allocation can succeed on 32-bit builds, e.g. with more
1365 # than 2GB RAM and a 64-bit kernel.
1366 if sys.maxsize > 0x7FFFFFFF:
1367 rawio = self.MockRawIO()
1368 bufio = self.tp(rawio)
1369 self.assertRaises((OverflowError, MemoryError, ValueError),
1370 bufio.__init__, rawio, sys.maxsize)
1371
1372 def test_initialization(self):
1373 rawio = self.MockRawIO()
1374 bufio = self.tp(rawio)
1375 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1376 self.assertRaises(ValueError, bufio.write, b"def")
1377 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1378 self.assertRaises(ValueError, bufio.write, b"def")
1379 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1380 self.assertRaises(ValueError, bufio.write, b"def")
1381
1382 def test_garbage_collection(self):
1383 # C BufferedWriter objects are collected, and collecting them flushes
1384 # all data to disk.
1385 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001386 with support.check_warnings(('', ResourceWarning)):
1387 rawio = self.FileIO(support.TESTFN, "w+b")
1388 f = self.tp(rawio)
1389 f.write(b"123xxx")
1390 f.x = f
1391 wr = weakref.ref(f)
1392 del f
1393 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001394 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001395 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396 self.assertEqual(f.read(), b"123xxx")
1397
R David Murray67bfe802013-02-23 21:51:05 -05001398 def test_args_error(self):
1399 # Issue #17275
1400 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1401 self.tp(io.BytesIO(), 1024, 1024, 1024)
1402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403
1404class PyBufferedWriterTest(BufferedWriterTest):
1405 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001406
Guido van Rossum01a27522007-03-07 01:00:12 +00001407class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001408
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001409 def test_constructor(self):
1410 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001411 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001412
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001413 def test_uninitialized(self):
1414 pair = self.tp.__new__(self.tp)
1415 del pair
1416 pair = self.tp.__new__(self.tp)
1417 self.assertRaisesRegex((ValueError, AttributeError),
1418 'uninitialized|has no attribute',
1419 pair.read, 0)
1420 self.assertRaisesRegex((ValueError, AttributeError),
1421 'uninitialized|has no attribute',
1422 pair.write, b'')
1423 pair.__init__(self.MockRawIO(), self.MockRawIO())
1424 self.assertEqual(pair.read(0), b'')
1425 self.assertEqual(pair.write(b''), 0)
1426
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001427 def test_detach(self):
1428 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1429 self.assertRaises(self.UnsupportedOperation, pair.detach)
1430
Florent Xicluna109d5732012-07-07 17:03:22 +02001431 def test_constructor_max_buffer_size_removal(self):
1432 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001433 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001434
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001435 def test_constructor_with_not_readable(self):
1436 class NotReadable(MockRawIO):
1437 def readable(self):
1438 return False
1439
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001440 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001441
1442 def test_constructor_with_not_writeable(self):
1443 class NotWriteable(MockRawIO):
1444 def writable(self):
1445 return False
1446
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001447 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001448
1449 def test_read(self):
1450 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1451
1452 self.assertEqual(pair.read(3), b"abc")
1453 self.assertEqual(pair.read(1), b"d")
1454 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001455 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1456 self.assertEqual(pair.read(None), b"abc")
1457
1458 def test_readlines(self):
1459 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1460 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1461 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1462 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001463
1464 def test_read1(self):
1465 # .read1() is delegated to the underlying reader object, so this test
1466 # can be shallow.
1467 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1468
1469 self.assertEqual(pair.read1(3), b"abc")
1470
1471 def test_readinto(self):
1472 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1473
1474 data = bytearray(5)
1475 self.assertEqual(pair.readinto(data), 5)
1476 self.assertEqual(data, b"abcde")
1477
1478 def test_write(self):
1479 w = self.MockRawIO()
1480 pair = self.tp(self.MockRawIO(), w)
1481
1482 pair.write(b"abc")
1483 pair.flush()
1484 pair.write(b"def")
1485 pair.flush()
1486 self.assertEqual(w._write_stack, [b"abc", b"def"])
1487
1488 def test_peek(self):
1489 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1490
1491 self.assertTrue(pair.peek(3).startswith(b"abc"))
1492 self.assertEqual(pair.read(3), b"abc")
1493
1494 def test_readable(self):
1495 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1496 self.assertTrue(pair.readable())
1497
1498 def test_writeable(self):
1499 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1500 self.assertTrue(pair.writable())
1501
1502 def test_seekable(self):
1503 # BufferedRWPairs are never seekable, even if their readers and writers
1504 # are.
1505 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1506 self.assertFalse(pair.seekable())
1507
1508 # .flush() is delegated to the underlying writer object and has been
1509 # tested in the test_write method.
1510
1511 def test_close_and_closed(self):
1512 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1513 self.assertFalse(pair.closed)
1514 pair.close()
1515 self.assertTrue(pair.closed)
1516
1517 def test_isatty(self):
1518 class SelectableIsAtty(MockRawIO):
1519 def __init__(self, isatty):
1520 MockRawIO.__init__(self)
1521 self._isatty = isatty
1522
1523 def isatty(self):
1524 return self._isatty
1525
1526 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1527 self.assertFalse(pair.isatty())
1528
1529 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1530 self.assertTrue(pair.isatty())
1531
1532 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1533 self.assertTrue(pair.isatty())
1534
1535 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1536 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538class CBufferedRWPairTest(BufferedRWPairTest):
1539 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541class PyBufferedRWPairTest(BufferedRWPairTest):
1542 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544
1545class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1546 read_mode = "rb+"
1547 write_mode = "wb+"
1548
1549 def test_constructor(self):
1550 BufferedReaderTest.test_constructor(self)
1551 BufferedWriterTest.test_constructor(self)
1552
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001553 def test_uninitialized(self):
1554 BufferedReaderTest.test_uninitialized(self)
1555 BufferedWriterTest.test_uninitialized(self)
1556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557 def test_read_and_write(self):
1558 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001559 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001560
1561 self.assertEqual(b"as", rw.read(2))
1562 rw.write(b"ddd")
1563 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001564 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001566 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001568 def test_seek_and_tell(self):
1569 raw = self.BytesIO(b"asdfghjkl")
1570 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001571
Ezio Melottib3aedd42010-11-20 19:04:17 +00001572 self.assertEqual(b"as", rw.read(2))
1573 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001574 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001575 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001576
Antoine Pitroue05565e2011-08-20 14:39:23 +02001577 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001578 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001579 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001580 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001581 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001582 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001583 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(7, rw.tell())
1585 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001586 rw.flush()
1587 self.assertEqual(b"asdf123fl", raw.getvalue())
1588
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001589 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 def check_flush_and_read(self, read_func):
1592 raw = self.BytesIO(b"abcdefghi")
1593 bufio = self.tp(raw)
1594
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(b"ef", read_func(bufio, 2))
1598 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001600 self.assertEqual(6, bufio.tell())
1601 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 raw.seek(0, 0)
1603 raw.write(b"XYZ")
1604 # flush() resets the read buffer
1605 bufio.flush()
1606 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001607 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001608
1609 def test_flush_and_read(self):
1610 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1611
1612 def test_flush_and_readinto(self):
1613 def _readinto(bufio, n=-1):
1614 b = bytearray(n if n >= 0 else 9999)
1615 n = bufio.readinto(b)
1616 return bytes(b[:n])
1617 self.check_flush_and_read(_readinto)
1618
1619 def test_flush_and_peek(self):
1620 def _peek(bufio, n=-1):
1621 # This relies on the fact that the buffer can contain the whole
1622 # raw stream, otherwise peek() can return less.
1623 b = bufio.peek(n)
1624 if n != -1:
1625 b = b[:n]
1626 bufio.seek(len(b), 1)
1627 return b
1628 self.check_flush_and_read(_peek)
1629
1630 def test_flush_and_write(self):
1631 raw = self.BytesIO(b"abcdefghi")
1632 bufio = self.tp(raw)
1633
1634 bufio.write(b"123")
1635 bufio.flush()
1636 bufio.write(b"45")
1637 bufio.flush()
1638 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001639 self.assertEqual(b"12345fghi", raw.getvalue())
1640 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641
1642 def test_threads(self):
1643 BufferedReaderTest.test_threads(self)
1644 BufferedWriterTest.test_threads(self)
1645
1646 def test_writes_and_peek(self):
1647 def _peek(bufio):
1648 bufio.peek(1)
1649 self.check_writes(_peek)
1650 def _peek(bufio):
1651 pos = bufio.tell()
1652 bufio.seek(-1, 1)
1653 bufio.peek(1)
1654 bufio.seek(pos, 0)
1655 self.check_writes(_peek)
1656
1657 def test_writes_and_reads(self):
1658 def _read(bufio):
1659 bufio.seek(-1, 1)
1660 bufio.read(1)
1661 self.check_writes(_read)
1662
1663 def test_writes_and_read1s(self):
1664 def _read1(bufio):
1665 bufio.seek(-1, 1)
1666 bufio.read1(1)
1667 self.check_writes(_read1)
1668
1669 def test_writes_and_readintos(self):
1670 def _read(bufio):
1671 bufio.seek(-1, 1)
1672 bufio.readinto(bytearray(1))
1673 self.check_writes(_read)
1674
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001675 def test_write_after_readahead(self):
1676 # Issue #6629: writing after the buffer was filled by readahead should
1677 # first rewind the raw stream.
1678 for overwrite_size in [1, 5]:
1679 raw = self.BytesIO(b"A" * 10)
1680 bufio = self.tp(raw, 4)
1681 # Trigger readahead
1682 self.assertEqual(bufio.read(1), b"A")
1683 self.assertEqual(bufio.tell(), 1)
1684 # Overwriting should rewind the raw stream if it needs so
1685 bufio.write(b"B" * overwrite_size)
1686 self.assertEqual(bufio.tell(), overwrite_size + 1)
1687 # If the write size was smaller than the buffer size, flush() and
1688 # check that rewind happens.
1689 bufio.flush()
1690 self.assertEqual(bufio.tell(), overwrite_size + 1)
1691 s = raw.getvalue()
1692 self.assertEqual(s,
1693 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1694
Antoine Pitrou7c404892011-05-13 00:13:33 +02001695 def test_write_rewind_write(self):
1696 # Various combinations of reading / writing / seeking backwards / writing again
1697 def mutate(bufio, pos1, pos2):
1698 assert pos2 >= pos1
1699 # Fill the buffer
1700 bufio.seek(pos1)
1701 bufio.read(pos2 - pos1)
1702 bufio.write(b'\x02')
1703 # This writes earlier than the previous write, but still inside
1704 # the buffer.
1705 bufio.seek(pos1)
1706 bufio.write(b'\x01')
1707
1708 b = b"\x80\x81\x82\x83\x84"
1709 for i in range(0, len(b)):
1710 for j in range(i, len(b)):
1711 raw = self.BytesIO(b)
1712 bufio = self.tp(raw, 100)
1713 mutate(bufio, i, j)
1714 bufio.flush()
1715 expected = bytearray(b)
1716 expected[j] = 2
1717 expected[i] = 1
1718 self.assertEqual(raw.getvalue(), expected,
1719 "failed result for i=%d, j=%d" % (i, j))
1720
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001721 def test_truncate_after_read_or_write(self):
1722 raw = self.BytesIO(b"A" * 10)
1723 bufio = self.tp(raw, 100)
1724 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1725 self.assertEqual(bufio.truncate(), 2)
1726 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1727 self.assertEqual(bufio.truncate(), 4)
1728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 def test_misbehaved_io(self):
1730 BufferedReaderTest.test_misbehaved_io(self)
1731 BufferedWriterTest.test_misbehaved_io(self)
1732
Antoine Pitroue05565e2011-08-20 14:39:23 +02001733 def test_interleaved_read_write(self):
1734 # Test for issue #12213
1735 with self.BytesIO(b'abcdefgh') as raw:
1736 with self.tp(raw, 100) as f:
1737 f.write(b"1")
1738 self.assertEqual(f.read(1), b'b')
1739 f.write(b'2')
1740 self.assertEqual(f.read1(1), b'd')
1741 f.write(b'3')
1742 buf = bytearray(1)
1743 f.readinto(buf)
1744 self.assertEqual(buf, b'f')
1745 f.write(b'4')
1746 self.assertEqual(f.peek(1), b'h')
1747 f.flush()
1748 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1749
1750 with self.BytesIO(b'abc') as raw:
1751 with self.tp(raw, 100) as f:
1752 self.assertEqual(f.read(1), b'a')
1753 f.write(b"2")
1754 self.assertEqual(f.read(1), b'c')
1755 f.flush()
1756 self.assertEqual(raw.getvalue(), b'a2c')
1757
1758 def test_interleaved_readline_write(self):
1759 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1760 with self.tp(raw) as f:
1761 f.write(b'1')
1762 self.assertEqual(f.readline(), b'b\n')
1763 f.write(b'2')
1764 self.assertEqual(f.readline(), b'def\n')
1765 f.write(b'3')
1766 self.assertEqual(f.readline(), b'\n')
1767 f.flush()
1768 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1769
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001770 # You can't construct a BufferedRandom over a non-seekable stream.
1771 test_unseekable = None
1772
R David Murray67bfe802013-02-23 21:51:05 -05001773
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001774class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 tp = io.BufferedRandom
1776
1777 def test_constructor(self):
1778 BufferedRandomTest.test_constructor(self)
1779 # The allocation can succeed on 32-bit builds, e.g. with more
1780 # than 2GB RAM and a 64-bit kernel.
1781 if sys.maxsize > 0x7FFFFFFF:
1782 rawio = self.MockRawIO()
1783 bufio = self.tp(rawio)
1784 self.assertRaises((OverflowError, MemoryError, ValueError),
1785 bufio.__init__, rawio, sys.maxsize)
1786
1787 def test_garbage_collection(self):
1788 CBufferedReaderTest.test_garbage_collection(self)
1789 CBufferedWriterTest.test_garbage_collection(self)
1790
R David Murray67bfe802013-02-23 21:51:05 -05001791 def test_args_error(self):
1792 # Issue #17275
1793 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1794 self.tp(io.BytesIO(), 1024, 1024, 1024)
1795
1796
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797class PyBufferedRandomTest(BufferedRandomTest):
1798 tp = pyio.BufferedRandom
1799
1800
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001801# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1802# properties:
1803# - A single output character can correspond to many bytes of input.
1804# - The number of input bytes to complete the character can be
1805# undetermined until the last input byte is received.
1806# - The number of input bytes can vary depending on previous input.
1807# - A single input byte can correspond to many characters of output.
1808# - The number of output characters can be undetermined until the
1809# last input byte is received.
1810# - The number of output characters can vary depending on previous input.
1811
1812class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1813 """
1814 For testing seek/tell behavior with a stateful, buffering decoder.
1815
1816 Input is a sequence of words. Words may be fixed-length (length set
1817 by input) or variable-length (period-terminated). In variable-length
1818 mode, extra periods are ignored. Possible words are:
1819 - 'i' followed by a number sets the input length, I (maximum 99).
1820 When I is set to 0, words are space-terminated.
1821 - 'o' followed by a number sets the output length, O (maximum 99).
1822 - Any other word is converted into a word followed by a period on
1823 the output. The output word consists of the input word truncated
1824 or padded out with hyphens to make its length equal to O. If O
1825 is 0, the word is output verbatim without truncating or padding.
1826 I and O are initially set to 1. When I changes, any buffered input is
1827 re-scanned according to the new I. EOF also terminates the last word.
1828 """
1829
1830 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001831 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001832 self.reset()
1833
1834 def __repr__(self):
1835 return '<SID %x>' % id(self)
1836
1837 def reset(self):
1838 self.i = 1
1839 self.o = 1
1840 self.buffer = bytearray()
1841
1842 def getstate(self):
1843 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1844 return bytes(self.buffer), i*100 + o
1845
1846 def setstate(self, state):
1847 buffer, io = state
1848 self.buffer = bytearray(buffer)
1849 i, o = divmod(io, 100)
1850 self.i, self.o = i ^ 1, o ^ 1
1851
1852 def decode(self, input, final=False):
1853 output = ''
1854 for b in input:
1855 if self.i == 0: # variable-length, terminated with period
1856 if b == ord('.'):
1857 if self.buffer:
1858 output += self.process_word()
1859 else:
1860 self.buffer.append(b)
1861 else: # fixed-length, terminate after self.i bytes
1862 self.buffer.append(b)
1863 if len(self.buffer) == self.i:
1864 output += self.process_word()
1865 if final and self.buffer: # EOF terminates the last word
1866 output += self.process_word()
1867 return output
1868
1869 def process_word(self):
1870 output = ''
1871 if self.buffer[0] == ord('i'):
1872 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1873 elif self.buffer[0] == ord('o'):
1874 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1875 else:
1876 output = self.buffer.decode('ascii')
1877 if len(output) < self.o:
1878 output += '-'*self.o # pad out with hyphens
1879 if self.o:
1880 output = output[:self.o] # truncate to output length
1881 output += '.'
1882 self.buffer = bytearray()
1883 return output
1884
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001885 codecEnabled = False
1886
1887 @classmethod
1888 def lookupTestDecoder(cls, name):
1889 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001890 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001891 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001892 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001893 incrementalencoder=None,
1894 streamreader=None, streamwriter=None,
1895 incrementaldecoder=cls)
1896
1897# Register the previous decoder for testing.
1898# Disabled by default, tests will enable it.
1899codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1900
1901
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902class StatefulIncrementalDecoderTest(unittest.TestCase):
1903 """
1904 Make sure the StatefulIncrementalDecoder actually works.
1905 """
1906
1907 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001908 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001909 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001910 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001911 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001912 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001913 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001914 # I=0, O=6 (variable-length input, fixed-length output)
1915 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1916 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001917 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001918 # I=6, O=3 (fixed-length input > fixed-length output)
1919 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1920 # I=0, then 3; O=29, then 15 (with longer output)
1921 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1922 'a----------------------------.' +
1923 'b----------------------------.' +
1924 'cde--------------------------.' +
1925 'abcdefghijabcde.' +
1926 'a.b------------.' +
1927 '.c.------------.' +
1928 'd.e------------.' +
1929 'k--------------.' +
1930 'l--------------.' +
1931 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001932 ]
1933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001935 # Try a few one-shot test cases.
1936 for input, eof, output in self.test_cases:
1937 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001938 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001939
1940 # Also test an unfinished decode, followed by forcing EOF.
1941 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(d.decode(b'oiabcd'), '')
1943 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001944
1945class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001947 def setUp(self):
1948 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1949 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001950 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001951
Guido van Rossumd0712812007-04-11 16:32:43 +00001952 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001953 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 def test_constructor(self):
1956 r = self.BytesIO(b"\xc3\xa9\n\n")
1957 b = self.BufferedReader(r, 1000)
1958 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001959 t.__init__(b, encoding="latin-1", newline="\r\n")
1960 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001961 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001962 t.__init__(b, encoding="utf-8", line_buffering=True)
1963 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001964 self.assertEqual(t.line_buffering, True)
1965 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 self.assertRaises(TypeError, t.__init__, b, newline=42)
1967 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1968
Nick Coghlana9b15242014-02-04 22:11:18 +10001969 def test_non_text_encoding_codecs_are_rejected(self):
1970 # Ensure the constructor complains if passed a codec that isn't
1971 # marked as a text encoding
1972 # http://bugs.python.org/issue20404
1973 r = self.BytesIO()
1974 b = self.BufferedWriter(r)
1975 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
1976 self.TextIOWrapper(b, encoding="hex")
1977
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001978 def test_detach(self):
1979 r = self.BytesIO()
1980 b = self.BufferedWriter(r)
1981 t = self.TextIOWrapper(b)
1982 self.assertIs(t.detach(), b)
1983
1984 t = self.TextIOWrapper(b, encoding="ascii")
1985 t.write("howdy")
1986 self.assertFalse(r.getvalue())
1987 t.detach()
1988 self.assertEqual(r.getvalue(), b"howdy")
1989 self.assertRaises(ValueError, t.detach)
1990
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001991 def test_repr(self):
1992 raw = self.BytesIO("hello".encode("utf-8"))
1993 b = self.BufferedReader(raw)
1994 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001995 modname = self.TextIOWrapper.__module__
1996 self.assertEqual(repr(t),
1997 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1998 raw.name = "dummy"
1999 self.assertEqual(repr(t),
2000 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002001 t.mode = "r"
2002 self.assertEqual(repr(t),
2003 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002004 raw.name = b"dummy"
2005 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002006 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_line_buffering(self):
2009 r = self.BytesIO()
2010 b = self.BufferedWriter(r, 1000)
2011 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002012 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002014 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002015 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002016 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002018
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002019 def test_default_encoding(self):
2020 old_environ = dict(os.environ)
2021 try:
2022 # try to get a user preferred encoding different than the current
2023 # locale encoding to check that TextIOWrapper() uses the current
2024 # locale encoding and not the user preferred encoding
2025 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2026 if key in os.environ:
2027 del os.environ[key]
2028
2029 current_locale_encoding = locale.getpreferredencoding(False)
2030 b = self.BytesIO()
2031 t = self.TextIOWrapper(b)
2032 self.assertEqual(t.encoding, current_locale_encoding)
2033 finally:
2034 os.environ.clear()
2035 os.environ.update(old_environ)
2036
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002037 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002038 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002039 # Issue 15989
2040 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002041 b = self.BytesIO()
2042 b.fileno = lambda: _testcapi.INT_MAX + 1
2043 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2044 b.fileno = lambda: _testcapi.UINT_MAX + 1
2045 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 def test_encoding(self):
2048 # Check the encoding attribute is always set, and valid
2049 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002050 t = self.TextIOWrapper(b, encoding="utf-8")
2051 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002053 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 codecs.lookup(t.encoding)
2055
2056 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002057 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 b = self.BytesIO(b"abc\n\xff\n")
2059 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002060 self.assertRaises(UnicodeError, t.read)
2061 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 b = self.BytesIO(b"abc\n\xff\n")
2063 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002064 self.assertRaises(UnicodeError, t.read)
2065 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066 b = self.BytesIO(b"abc\n\xff\n")
2067 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002068 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002069 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070 b = self.BytesIO(b"abc\n\xff\n")
2071 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002075 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 b = self.BytesIO()
2077 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002078 self.assertRaises(UnicodeError, t.write, "\xff")
2079 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 b = self.BytesIO()
2081 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002082 self.assertRaises(UnicodeError, t.write, "\xff")
2083 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 b = self.BytesIO()
2085 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002086 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002087 t.write("abc\xffdef\n")
2088 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002090 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 b = self.BytesIO()
2092 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002093 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002094 t.write("abc\xffdef\n")
2095 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002099 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2100
2101 tests = [
2102 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002103 [ '', input_lines ],
2104 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2105 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2106 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002107 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002108 encodings = (
2109 'utf-8', 'latin-1',
2110 'utf-16', 'utf-16-le', 'utf-16-be',
2111 'utf-32', 'utf-32-le', 'utf-32-be',
2112 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002113
Guido van Rossum8358db22007-08-18 21:39:55 +00002114 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002115 # character in TextIOWrapper._pending_line.
2116 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002117 # XXX: str.encode() should return bytes
2118 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002119 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002120 for bufsize in range(1, 10):
2121 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2123 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002124 encoding=encoding)
2125 if do_reads:
2126 got_lines = []
2127 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002128 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002129 if c2 == '':
2130 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002132 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002133 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002134 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002135
2136 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002137 self.assertEqual(got_line, exp_line)
2138 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 def test_newlines_input(self):
2141 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002142 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2143 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002144 (None, normalized.decode("ascii").splitlines(keepends=True)),
2145 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2147 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2148 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002149 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002150 buf = self.BytesIO(testdata)
2151 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002153 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002154 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 def test_newlines_output(self):
2157 testdict = {
2158 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2159 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2160 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2161 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2162 }
2163 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2164 for newline, expected in tests:
2165 buf = self.BytesIO()
2166 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2167 txt.write("AAA\nB")
2168 txt.write("BB\nCCC\n")
2169 txt.write("X\rY\r\nZ")
2170 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(buf.closed, False)
2172 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173
2174 def test_destructor(self):
2175 l = []
2176 base = self.BytesIO
2177 class MyBytesIO(base):
2178 def close(self):
2179 l.append(self.getvalue())
2180 base.close(self)
2181 b = MyBytesIO()
2182 t = self.TextIOWrapper(b, encoding="ascii")
2183 t.write("abc")
2184 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002185 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187
2188 def test_override_destructor(self):
2189 record = []
2190 class MyTextIO(self.TextIOWrapper):
2191 def __del__(self):
2192 record.append(1)
2193 try:
2194 f = super().__del__
2195 except AttributeError:
2196 pass
2197 else:
2198 f()
2199 def close(self):
2200 record.append(2)
2201 super().close()
2202 def flush(self):
2203 record.append(3)
2204 super().flush()
2205 b = self.BytesIO()
2206 t = MyTextIO(b, encoding="ascii")
2207 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002208 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002209 self.assertEqual(record, [1, 2, 3])
2210
2211 def test_error_through_destructor(self):
2212 # Test that the exception state is not modified by a destructor,
2213 # even if close() fails.
2214 rawio = self.CloseFailureIO()
2215 def f():
2216 self.TextIOWrapper(rawio).xyzzy
2217 with support.captured_output("stderr") as s:
2218 self.assertRaises(AttributeError, f)
2219 s = s.getvalue().strip()
2220 if s:
2221 # The destructor *may* have printed an unraisable error, check it
2222 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002223 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002224 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002225
Guido van Rossum9b76da62007-04-11 01:09:03 +00002226 # Systematic tests of the text I/O API
2227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002229 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002230 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002232 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002234 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002236 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(f.tell(), 0)
2238 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002239 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(f.seek(0), 0)
2241 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002242 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002243 self.assertEqual(f.read(2), "ab")
2244 self.assertEqual(f.read(1), "c")
2245 self.assertEqual(f.read(1), "")
2246 self.assertEqual(f.read(), "")
2247 self.assertEqual(f.tell(), cookie)
2248 self.assertEqual(f.seek(0), 0)
2249 self.assertEqual(f.seek(0, 2), cookie)
2250 self.assertEqual(f.write("def"), 3)
2251 self.assertEqual(f.seek(cookie), cookie)
2252 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002253 if enc.startswith("utf"):
2254 self.multi_line_test(f, enc)
2255 f.close()
2256
2257 def multi_line_test(self, f, enc):
2258 f.seek(0)
2259 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002260 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002261 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002262 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002263 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002264 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002265 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002266 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002267 wlines.append((f.tell(), line))
2268 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002269 f.seek(0)
2270 rlines = []
2271 while True:
2272 pos = f.tell()
2273 line = f.readline()
2274 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002275 break
2276 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002280 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002281 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002282 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002283 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002284 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002285 p2 = f.tell()
2286 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002287 self.assertEqual(f.tell(), p0)
2288 self.assertEqual(f.readline(), "\xff\n")
2289 self.assertEqual(f.tell(), p1)
2290 self.assertEqual(f.readline(), "\xff\n")
2291 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002292 f.seek(0)
2293 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002294 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002295 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002296 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002297 f.close()
2298
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 def test_seeking(self):
2300 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002301 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002302 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002303 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002304 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002305 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002306 suffix = bytes(u_suffix.encode("utf-8"))
2307 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002308 with self.open(support.TESTFN, "wb") as f:
2309 f.write(line*2)
2310 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2311 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(s, str(prefix, "ascii"))
2313 self.assertEqual(f.tell(), prefix_size)
2314 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002315
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002316 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002317 # Regression test for a specific bug
2318 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002319 with self.open(support.TESTFN, "wb") as f:
2320 f.write(data)
2321 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2322 f._CHUNK_SIZE # Just test that it exists
2323 f._CHUNK_SIZE = 2
2324 f.readline()
2325 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def test_seek_and_tell(self):
2328 #Test seek/tell using the StatefulIncrementalDecoder.
2329 # Make test faster by doing smaller seeks
2330 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002332 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002333 """Tell/seek to various points within a data stream and ensure
2334 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002336 f.write(data)
2337 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 f = self.open(support.TESTFN, encoding='test_decoder')
2339 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002340 decoded = f.read()
2341 f.close()
2342
Neal Norwitze2b07052008-03-18 19:52:05 +00002343 for i in range(min_pos, len(decoded) + 1): # seek positions
2344 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002346 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002347 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002348 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002351 f.close()
2352
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002353 # Enable the test decoder.
2354 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002355
2356 # Run the tests.
2357 try:
2358 # Try each test case.
2359 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002360 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002361
2362 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002363 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2364 offset = CHUNK_SIZE - len(input)//2
2365 prefix = b'.'*offset
2366 # Don't bother seeking into the prefix (takes too long).
2367 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002368 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002369
2370 # Ensure our test decoder won't interfere with subsequent tests.
2371 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002372 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002375 data = "1234567890"
2376 tests = ("utf-16",
2377 "utf-16-le",
2378 "utf-16-be",
2379 "utf-32",
2380 "utf-32-le",
2381 "utf-32-be")
2382 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 buf = self.BytesIO()
2384 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002385 # Check if the BOM is written only once (see issue1753).
2386 f.write(data)
2387 f.write(data)
2388 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002390 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002391 self.assertEqual(f.read(), data * 2)
2392 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002393
Benjamin Petersona1b49012009-03-31 23:11:32 +00002394 def test_unreadable(self):
2395 class UnReadable(self.BytesIO):
2396 def readable(self):
2397 return False
2398 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002399 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401 def test_read_one_by_one(self):
2402 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002403 reads = ""
2404 while True:
2405 c = txt.read(1)
2406 if not c:
2407 break
2408 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002410
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002411 def test_readlines(self):
2412 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2413 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2414 txt.seek(0)
2415 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2416 txt.seek(0)
2417 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2418
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002419 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002421 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002423 reads = ""
2424 while True:
2425 c = txt.read(128)
2426 if not c:
2427 break
2428 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002430
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002431 def test_writelines(self):
2432 l = ['ab', 'cd', 'ef']
2433 buf = self.BytesIO()
2434 txt = self.TextIOWrapper(buf)
2435 txt.writelines(l)
2436 txt.flush()
2437 self.assertEqual(buf.getvalue(), b'abcdef')
2438
2439 def test_writelines_userlist(self):
2440 l = UserList(['ab', 'cd', 'ef'])
2441 buf = self.BytesIO()
2442 txt = self.TextIOWrapper(buf)
2443 txt.writelines(l)
2444 txt.flush()
2445 self.assertEqual(buf.getvalue(), b'abcdef')
2446
2447 def test_writelines_error(self):
2448 txt = self.TextIOWrapper(self.BytesIO())
2449 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2450 self.assertRaises(TypeError, txt.writelines, None)
2451 self.assertRaises(TypeError, txt.writelines, b'abc')
2452
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002455
2456 # read one char at a time
2457 reads = ""
2458 while True:
2459 c = txt.read(1)
2460 if not c:
2461 break
2462 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002463 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002464
2465 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002467 txt._CHUNK_SIZE = 4
2468
2469 reads = ""
2470 while True:
2471 c = txt.read(4)
2472 if not c:
2473 break
2474 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002476
2477 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002479 txt._CHUNK_SIZE = 4
2480
2481 reads = txt.read(4)
2482 reads += txt.read(4)
2483 reads += txt.readline()
2484 reads += txt.readline()
2485 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002486 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002487
2488 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002489 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002490 txt._CHUNK_SIZE = 4
2491
2492 reads = txt.read(4)
2493 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002494 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002495
2496 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002498 txt._CHUNK_SIZE = 4
2499
2500 reads = txt.read(4)
2501 pos = txt.tell()
2502 txt.seek(0)
2503 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002506 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 buffer = self.BytesIO(self.testdata)
2508 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002509
2510 self.assertEqual(buffer.seekable(), txt.seekable())
2511
Antoine Pitroue4501852009-05-14 18:55:55 +00002512 def test_append_bom(self):
2513 # The BOM is not written again when appending to a non-empty file
2514 filename = support.TESTFN
2515 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2516 with self.open(filename, 'w', encoding=charset) as f:
2517 f.write('aaa')
2518 pos = f.tell()
2519 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002521
2522 with self.open(filename, 'a', encoding=charset) as f:
2523 f.write('xxx')
2524 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002525 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002526
2527 def test_seek_bom(self):
2528 # Same test, but when seeking manually
2529 filename = support.TESTFN
2530 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2531 with self.open(filename, 'w', encoding=charset) as f:
2532 f.write('aaa')
2533 pos = f.tell()
2534 with self.open(filename, 'r+', encoding=charset) as f:
2535 f.seek(pos)
2536 f.write('zzz')
2537 f.seek(0)
2538 f.write('bbb')
2539 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002541
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002542 def test_errors_property(self):
2543 with self.open(support.TESTFN, "w") as f:
2544 self.assertEqual(f.errors, "strict")
2545 with self.open(support.TESTFN, "w", errors="replace") as f:
2546 self.assertEqual(f.errors, "replace")
2547
Brett Cannon31f59292011-02-21 19:29:56 +00002548 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002549 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002550 def test_threads_write(self):
2551 # Issue6750: concurrent writes could duplicate data
2552 event = threading.Event()
2553 with self.open(support.TESTFN, "w", buffering=1) as f:
2554 def run(n):
2555 text = "Thread%03d\n" % n
2556 event.wait()
2557 f.write(text)
2558 threads = [threading.Thread(target=lambda n=x: run(n))
2559 for x in range(20)]
2560 for t in threads:
2561 t.start()
2562 time.sleep(0.02)
2563 event.set()
2564 for t in threads:
2565 t.join()
2566 with self.open(support.TESTFN) as f:
2567 content = f.read()
2568 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002570
Antoine Pitrou6be88762010-05-03 16:48:20 +00002571 def test_flush_error_on_close(self):
2572 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2573 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002574 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002575 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002576 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002577 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002578
2579 def test_multi_close(self):
2580 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2581 txt.close()
2582 txt.close()
2583 txt.close()
2584 self.assertRaises(ValueError, txt.flush)
2585
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002586 def test_unseekable(self):
2587 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2588 self.assertRaises(self.UnsupportedOperation, txt.tell)
2589 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2590
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002591 def test_readonly_attributes(self):
2592 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2593 buf = self.BytesIO(self.testdata)
2594 with self.assertRaises(AttributeError):
2595 txt.buffer = buf
2596
Antoine Pitroue96ec682011-07-23 21:46:35 +02002597 def test_rawio(self):
2598 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2599 # that subprocess.Popen() can have the required unbuffered
2600 # semantics with universal_newlines=True.
2601 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2602 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2603 # Reads
2604 self.assertEqual(txt.read(4), 'abcd')
2605 self.assertEqual(txt.readline(), 'efghi\n')
2606 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2607
2608 def test_rawio_write_through(self):
2609 # Issue #12591: with write_through=True, writes don't need a flush
2610 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2611 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2612 write_through=True)
2613 txt.write('1')
2614 txt.write('23\n4')
2615 txt.write('5')
2616 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2617
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002618 def test_read_nonbytes(self):
2619 # Issue #17106
2620 # Crash when underlying read() returns non-bytes
2621 t = self.TextIOWrapper(self.StringIO('a'))
2622 self.assertRaises(TypeError, t.read, 1)
2623 t = self.TextIOWrapper(self.StringIO('a'))
2624 self.assertRaises(TypeError, t.readline)
2625 t = self.TextIOWrapper(self.StringIO('a'))
2626 self.assertRaises(TypeError, t.read)
2627
2628 def test_illegal_decoder(self):
2629 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002630 # Bypass the early encoding check added in issue 20404
2631 def _make_illegal_wrapper():
2632 quopri = codecs.lookup("quopri")
2633 quopri._is_text_encoding = True
2634 try:
2635 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2636 newline='\n', encoding="quopri")
2637 finally:
2638 quopri._is_text_encoding = False
2639 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002640 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002641 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002642 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002643 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002644 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002645 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002646 self.assertRaises(TypeError, t.read)
2647
Antoine Pitrou712cb732013-12-21 15:51:54 +01002648 def _check_create_at_shutdown(self, **kwargs):
2649 # Issue #20037: creating a TextIOWrapper at shutdown
2650 # shouldn't crash the interpreter.
2651 iomod = self.io.__name__
2652 code = """if 1:
2653 import codecs
2654 import {iomod} as io
2655
2656 # Avoid looking up codecs at shutdown
2657 codecs.lookup('utf-8')
2658
2659 class C:
2660 def __init__(self):
2661 self.buf = io.BytesIO()
2662 def __del__(self):
2663 io.TextIOWrapper(self.buf, **{kwargs})
2664 print("ok")
2665 c = C()
2666 """.format(iomod=iomod, kwargs=kwargs)
2667 return assert_python_ok("-c", code)
2668
2669 def test_create_at_shutdown_without_encoding(self):
2670 rc, out, err = self._check_create_at_shutdown()
2671 if err:
2672 # Can error out with a RuntimeError if the module state
2673 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002674 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002675 else:
2676 self.assertEqual("ok", out.decode().strip())
2677
2678 def test_create_at_shutdown_with_encoding(self):
2679 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2680 errors='strict')
2681 self.assertFalse(err)
2682 self.assertEqual("ok", out.decode().strip())
2683
Antoine Pitroub8503892014-04-29 10:14:02 +02002684 def test_read_byteslike(self):
2685 r = MemviewBytesIO(b'Just some random string\n')
2686 t = self.TextIOWrapper(r, 'utf-8')
2687
2688 # TextIOwrapper will not read the full string, because
2689 # we truncate it to a multiple of the native int size
2690 # so that we can construct a more complex memoryview.
2691 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2692
2693 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2694
2695class MemviewBytesIO(io.BytesIO):
2696 '''A BytesIO object whose read method returns memoryviews
2697 rather than bytes'''
2698
2699 def read1(self, len_):
2700 return _to_memoryview(super().read1(len_))
2701
2702 def read(self, len_):
2703 return _to_memoryview(super().read(len_))
2704
2705def _to_memoryview(buf):
2706 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2707
2708 arr = array.array('i')
2709 idx = len(buf) - len(buf) % arr.itemsize
2710 arr.frombytes(buf[:idx])
2711 return memoryview(arr)
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002712
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002713class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002714 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002715 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002716
2717 def test_initialization(self):
2718 r = self.BytesIO(b"\xc3\xa9\n\n")
2719 b = self.BufferedReader(r, 1000)
2720 t = self.TextIOWrapper(b)
2721 self.assertRaises(TypeError, t.__init__, b, newline=42)
2722 self.assertRaises(ValueError, t.read)
2723 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2724 self.assertRaises(ValueError, t.read)
2725
2726 def test_garbage_collection(self):
2727 # C TextIOWrapper objects are collected, and collecting them flushes
2728 # all data to disk.
2729 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002730 with support.check_warnings(('', ResourceWarning)):
2731 rawio = io.FileIO(support.TESTFN, "wb")
2732 b = self.BufferedWriter(rawio)
2733 t = self.TextIOWrapper(b, encoding="ascii")
2734 t.write("456def")
2735 t.x = t
2736 wr = weakref.ref(t)
2737 del t
2738 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002739 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002740 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 self.assertEqual(f.read(), b"456def")
2742
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002743 def test_rwpair_cleared_before_textio(self):
2744 # Issue 13070: TextIOWrapper's finalization would crash when called
2745 # after the reference to the underlying BufferedRWPair's writer got
2746 # cleared by the GC.
2747 for i in range(1000):
2748 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2749 t1 = self.TextIOWrapper(b1, encoding="ascii")
2750 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2751 t2 = self.TextIOWrapper(b2, encoding="ascii")
2752 # circular references
2753 t1.buddy = t2
2754 t2.buddy = t1
2755 support.gc_collect()
2756
2757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002759 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002760 #shutdown_error = "LookupError: unknown encoding: ascii"
2761 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002762
2763
2764class IncrementalNewlineDecoderTest(unittest.TestCase):
2765
2766 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002767 # UTF-8 specific tests for a newline decoder
2768 def _check_decode(b, s, **kwargs):
2769 # We exercise getstate() / setstate() as well as decode()
2770 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002772 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002773 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002774
Antoine Pitrou180a3362008-12-14 16:36:46 +00002775 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002776
Antoine Pitrou180a3362008-12-14 16:36:46 +00002777 _check_decode(b'\xe8', "")
2778 _check_decode(b'\xa2', "")
2779 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002780
Antoine Pitrou180a3362008-12-14 16:36:46 +00002781 _check_decode(b'\xe8', "")
2782 _check_decode(b'\xa2', "")
2783 _check_decode(b'\x88', "\u8888")
2784
2785 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002786 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2787
Antoine Pitrou180a3362008-12-14 16:36:46 +00002788 decoder.reset()
2789 _check_decode(b'\n', "\n")
2790 _check_decode(b'\r', "")
2791 _check_decode(b'', "\n", final=True)
2792 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002793
Antoine Pitrou180a3362008-12-14 16:36:46 +00002794 _check_decode(b'\r', "")
2795 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002796
Antoine Pitrou180a3362008-12-14 16:36:46 +00002797 _check_decode(b'\r\r\n', "\n\n")
2798 _check_decode(b'\r', "")
2799 _check_decode(b'\r', "\n")
2800 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002801
Antoine Pitrou180a3362008-12-14 16:36:46 +00002802 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2803 _check_decode(b'\xe8\xa2\x88', "\u8888")
2804 _check_decode(b'\n', "\n")
2805 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2806 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002808 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002809 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 if encoding is not None:
2811 encoder = codecs.getincrementalencoder(encoding)()
2812 def _decode_bytewise(s):
2813 # Decode one byte at a time
2814 for b in encoder.encode(s):
2815 result.append(decoder.decode(bytes([b])))
2816 else:
2817 encoder = None
2818 def _decode_bytewise(s):
2819 # Decode one char at a time
2820 for c in s:
2821 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002822 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002823 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002824 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002825 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002827 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002829 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002831 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002832 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002833 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002834 input = "abc"
2835 if encoder is not None:
2836 encoder.reset()
2837 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002838 self.assertEqual(decoder.decode(input), "abc")
2839 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002840
2841 def test_newline_decoder(self):
2842 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002843 # None meaning the IncrementalNewlineDecoder takes unicode input
2844 # rather than bytes input
2845 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002846 'utf-16', 'utf-16-le', 'utf-16-be',
2847 'utf-32', 'utf-32-le', 'utf-32-be',
2848 )
2849 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002850 decoder = enc and codecs.getincrementaldecoder(enc)()
2851 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2852 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002853 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002854 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2855 self.check_newline_decoding_utf8(decoder)
2856
Antoine Pitrou66913e22009-03-06 23:40:56 +00002857 def test_newline_bytes(self):
2858 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2859 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(dec.newlines, None)
2861 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2862 self.assertEqual(dec.newlines, None)
2863 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2864 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002865 dec = self.IncrementalNewlineDecoder(None, translate=False)
2866 _check(dec)
2867 dec = self.IncrementalNewlineDecoder(None, translate=True)
2868 _check(dec)
2869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002870class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2871 pass
2872
2873class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2874 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002875
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002876
Guido van Rossum01a27522007-03-07 01:00:12 +00002877# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002878
Guido van Rossum5abbf752007-08-27 17:39:33 +00002879class MiscIOTest(unittest.TestCase):
2880
Barry Warsaw40e82462008-11-20 20:14:50 +00002881 def tearDown(self):
2882 support.unlink(support.TESTFN)
2883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002884 def test___all__(self):
2885 for name in self.io.__all__:
2886 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002887 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002888 if name == "open":
2889 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002890 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002891 self.assertTrue(issubclass(obj, Exception), name)
2892 elif not name.startswith("SEEK_"):
2893 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002894
Barry Warsaw40e82462008-11-20 20:14:50 +00002895 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002896 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002898 f.close()
2899
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002900 with support.check_warnings(('', DeprecationWarning)):
2901 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(f.name, support.TESTFN)
2903 self.assertEqual(f.buffer.name, support.TESTFN)
2904 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2905 self.assertEqual(f.mode, "U")
2906 self.assertEqual(f.buffer.mode, "rb")
2907 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002908 f.close()
2909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002910 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002911 self.assertEqual(f.mode, "w+")
2912 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2913 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002915 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002916 self.assertEqual(g.mode, "wb")
2917 self.assertEqual(g.raw.mode, "wb")
2918 self.assertEqual(g.name, f.fileno())
2919 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002920 f.close()
2921 g.close()
2922
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002923 def test_io_after_close(self):
2924 for kwargs in [
2925 {"mode": "w"},
2926 {"mode": "wb"},
2927 {"mode": "w", "buffering": 1},
2928 {"mode": "w", "buffering": 2},
2929 {"mode": "wb", "buffering": 0},
2930 {"mode": "r"},
2931 {"mode": "rb"},
2932 {"mode": "r", "buffering": 1},
2933 {"mode": "r", "buffering": 2},
2934 {"mode": "rb", "buffering": 0},
2935 {"mode": "w+"},
2936 {"mode": "w+b"},
2937 {"mode": "w+", "buffering": 1},
2938 {"mode": "w+", "buffering": 2},
2939 {"mode": "w+b", "buffering": 0},
2940 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002942 f.close()
2943 self.assertRaises(ValueError, f.flush)
2944 self.assertRaises(ValueError, f.fileno)
2945 self.assertRaises(ValueError, f.isatty)
2946 self.assertRaises(ValueError, f.__iter__)
2947 if hasattr(f, "peek"):
2948 self.assertRaises(ValueError, f.peek, 1)
2949 self.assertRaises(ValueError, f.read)
2950 if hasattr(f, "read1"):
2951 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002952 if hasattr(f, "readall"):
2953 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002954 if hasattr(f, "readinto"):
2955 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2956 self.assertRaises(ValueError, f.readline)
2957 self.assertRaises(ValueError, f.readlines)
2958 self.assertRaises(ValueError, f.seek, 0)
2959 self.assertRaises(ValueError, f.tell)
2960 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002961 self.assertRaises(ValueError, f.write,
2962 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002963 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002964 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002966 def test_blockingioerror(self):
2967 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002968 class C(str):
2969 pass
2970 c = C("")
2971 b = self.BlockingIOError(1, c)
2972 c.b = b
2973 b.c = c
2974 wr = weakref.ref(c)
2975 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002976 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002977 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002978
2979 def test_abcs(self):
2980 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002981 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2982 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2983 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2984 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002985
2986 def _check_abc_inheritance(self, abcmodule):
2987 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002988 self.assertIsInstance(f, abcmodule.IOBase)
2989 self.assertIsInstance(f, abcmodule.RawIOBase)
2990 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2991 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002993 self.assertIsInstance(f, abcmodule.IOBase)
2994 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2995 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2996 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002997 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002998 self.assertIsInstance(f, abcmodule.IOBase)
2999 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3000 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3001 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003002
3003 def test_abc_inheritance(self):
3004 # Test implementations inherit from their respective ABCs
3005 self._check_abc_inheritance(self)
3006
3007 def test_abc_inheritance_official(self):
3008 # Test implementations inherit from the official ABCs of the
3009 # baseline "io" module.
3010 self._check_abc_inheritance(io)
3011
Antoine Pitroue033e062010-10-29 10:38:18 +00003012 def _check_warn_on_dealloc(self, *args, **kwargs):
3013 f = open(*args, **kwargs)
3014 r = repr(f)
3015 with self.assertWarns(ResourceWarning) as cm:
3016 f = None
3017 support.gc_collect()
3018 self.assertIn(r, str(cm.warning.args[0]))
3019
3020 def test_warn_on_dealloc(self):
3021 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3022 self._check_warn_on_dealloc(support.TESTFN, "wb")
3023 self._check_warn_on_dealloc(support.TESTFN, "w")
3024
3025 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3026 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003027 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003028 for fd in fds:
3029 try:
3030 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003031 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003032 if e.errno != errno.EBADF:
3033 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003034 self.addCleanup(cleanup_fds)
3035 r, w = os.pipe()
3036 fds += r, w
3037 self._check_warn_on_dealloc(r, *args, **kwargs)
3038 # When using closefd=False, there's no warning
3039 r, w = os.pipe()
3040 fds += r, w
3041 with warnings.catch_warnings(record=True) as recorded:
3042 open(r, *args, closefd=False, **kwargs)
3043 support.gc_collect()
3044 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003045
3046 def test_warn_on_dealloc_fd(self):
3047 self._check_warn_on_dealloc_fd("rb", buffering=0)
3048 self._check_warn_on_dealloc_fd("rb")
3049 self._check_warn_on_dealloc_fd("r")
3050
3051
Antoine Pitrou243757e2010-11-05 21:15:39 +00003052 def test_pickling(self):
3053 # Pickling file objects is forbidden
3054 for kwargs in [
3055 {"mode": "w"},
3056 {"mode": "wb"},
3057 {"mode": "wb", "buffering": 0},
3058 {"mode": "r"},
3059 {"mode": "rb"},
3060 {"mode": "rb", "buffering": 0},
3061 {"mode": "w+"},
3062 {"mode": "w+b"},
3063 {"mode": "w+b", "buffering": 0},
3064 ]:
3065 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3066 with self.open(support.TESTFN, **kwargs) as f:
3067 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3068
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003069 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3070 def test_nonblock_pipe_write_bigbuf(self):
3071 self._test_nonblock_pipe_write(16*1024)
3072
3073 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3074 def test_nonblock_pipe_write_smallbuf(self):
3075 self._test_nonblock_pipe_write(1024)
3076
3077 def _set_non_blocking(self, fd):
3078 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3079 self.assertNotEqual(flags, -1)
3080 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3081 self.assertEqual(res, 0)
3082
3083 def _test_nonblock_pipe_write(self, bufsize):
3084 sent = []
3085 received = []
3086 r, w = os.pipe()
3087 self._set_non_blocking(r)
3088 self._set_non_blocking(w)
3089
3090 # To exercise all code paths in the C implementation we need
3091 # to play with buffer sizes. For instance, if we choose a
3092 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3093 # then we will never get a partial write of the buffer.
3094 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3095 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3096
3097 with rf, wf:
3098 for N in 9999, 73, 7574:
3099 try:
3100 i = 0
3101 while True:
3102 msg = bytes([i % 26 + 97]) * N
3103 sent.append(msg)
3104 wf.write(msg)
3105 i += 1
3106
3107 except self.BlockingIOError as e:
3108 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003109 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003110 sent[-1] = sent[-1][:e.characters_written]
3111 received.append(rf.read())
3112 msg = b'BLOCKED'
3113 wf.write(msg)
3114 sent.append(msg)
3115
3116 while True:
3117 try:
3118 wf.flush()
3119 break
3120 except self.BlockingIOError as e:
3121 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003122 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003123 self.assertEqual(e.characters_written, 0)
3124 received.append(rf.read())
3125
3126 received += iter(rf.read, None)
3127
3128 sent, received = b''.join(sent), b''.join(received)
3129 self.assertTrue(sent == received)
3130 self.assertTrue(wf.closed)
3131 self.assertTrue(rf.closed)
3132
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003133 def test_create_fail(self):
3134 # 'x' mode fails if file is existing
3135 with self.open(support.TESTFN, 'w'):
3136 pass
3137 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3138
3139 def test_create_writes(self):
3140 # 'x' mode opens for writing
3141 with self.open(support.TESTFN, 'xb') as f:
3142 f.write(b"spam")
3143 with self.open(support.TESTFN, 'rb') as f:
3144 self.assertEqual(b"spam", f.read())
3145
Christian Heimes7b648752012-09-10 14:48:43 +02003146 def test_open_allargs(self):
3147 # there used to be a buffer overflow in the parser for rawmode
3148 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3149
3150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003151class CMiscIOTest(MiscIOTest):
3152 io = io
3153
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003154 def test_readinto_buffer_overflow(self):
3155 # Issue #18025
3156 class BadReader(self.io.BufferedIOBase):
3157 def read(self, n=-1):
3158 return b'x' * 10**6
3159 bufio = BadReader()
3160 b = bytearray(2)
3161 self.assertRaises(ValueError, bufio.readinto, b)
3162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163class PyMiscIOTest(MiscIOTest):
3164 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003165
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003166
3167@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3168class SignalsTest(unittest.TestCase):
3169
3170 def setUp(self):
3171 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3172
3173 def tearDown(self):
3174 signal.signal(signal.SIGALRM, self.oldalrm)
3175
3176 def alarm_interrupt(self, sig, frame):
3177 1/0
3178
3179 @unittest.skipUnless(threading, 'Threading required for this test.')
3180 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3181 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003182 invokes the signal handler, and bubbles up the exception raised
3183 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003184 read_results = []
3185 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003186 if hasattr(signal, 'pthread_sigmask'):
3187 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003188 s = os.read(r, 1)
3189 read_results.append(s)
3190 t = threading.Thread(target=_read)
3191 t.daemon = True
3192 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003193 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003194 try:
3195 wio = self.io.open(w, **fdopen_kwargs)
3196 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003197 # Fill the pipe enough that the write will be blocking.
3198 # It will be interrupted by the timer armed above. Since the
3199 # other thread has read one byte, the low-level write will
3200 # return with a successful (partial) result rather than an EINTR.
3201 # The buffered IO layer must check for pending signal
3202 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003203 signal.alarm(1)
3204 try:
3205 self.assertRaises(ZeroDivisionError,
3206 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3207 finally:
3208 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003209 t.join()
3210 # We got one byte, get another one and check that it isn't a
3211 # repeat of the first one.
3212 read_results.append(os.read(r, 1))
3213 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3214 finally:
3215 os.close(w)
3216 os.close(r)
3217 # This is deliberate. If we didn't close the file descriptor
3218 # before closing wio, wio would try to flush its internal
3219 # buffer, and block again.
3220 try:
3221 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003222 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003223 if e.errno != errno.EBADF:
3224 raise
3225
3226 def test_interrupted_write_unbuffered(self):
3227 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3228
3229 def test_interrupted_write_buffered(self):
3230 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3231
3232 def test_interrupted_write_text(self):
3233 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3234
Brett Cannon31f59292011-02-21 19:29:56 +00003235 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003236 def check_reentrant_write(self, data, **fdopen_kwargs):
3237 def on_alarm(*args):
3238 # Will be called reentrantly from the same thread
3239 wio.write(data)
3240 1/0
3241 signal.signal(signal.SIGALRM, on_alarm)
3242 r, w = os.pipe()
3243 wio = self.io.open(w, **fdopen_kwargs)
3244 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003245 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003246 # Either the reentrant call to wio.write() fails with RuntimeError,
3247 # or the signal handler raises ZeroDivisionError.
3248 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3249 while 1:
3250 for i in range(100):
3251 wio.write(data)
3252 wio.flush()
3253 # Make sure the buffer doesn't fill up and block further writes
3254 os.read(r, len(data) * 100)
3255 exc = cm.exception
3256 if isinstance(exc, RuntimeError):
3257 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3258 finally:
3259 wio.close()
3260 os.close(r)
3261
3262 def test_reentrant_write_buffered(self):
3263 self.check_reentrant_write(b"xy", mode="wb")
3264
3265 def test_reentrant_write_text(self):
3266 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3267
Antoine Pitrou707ce822011-02-25 21:24:11 +00003268 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3269 """Check that a buffered read, when it gets interrupted (either
3270 returning a partial result or EINTR), properly invokes the signal
3271 handler and retries if the latter returned successfully."""
3272 r, w = os.pipe()
3273 fdopen_kwargs["closefd"] = False
3274 def alarm_handler(sig, frame):
3275 os.write(w, b"bar")
3276 signal.signal(signal.SIGALRM, alarm_handler)
3277 try:
3278 rio = self.io.open(r, **fdopen_kwargs)
3279 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003280 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003281 # Expected behaviour:
3282 # - first raw read() returns partial b"foo"
3283 # - second raw read() returns EINTR
3284 # - third raw read() returns b"bar"
3285 self.assertEqual(decode(rio.read(6)), "foobar")
3286 finally:
3287 rio.close()
3288 os.close(w)
3289 os.close(r)
3290
Antoine Pitrou20db5112011-08-19 20:32:34 +02003291 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003292 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3293 mode="rb")
3294
Antoine Pitrou20db5112011-08-19 20:32:34 +02003295 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003296 self.check_interrupted_read_retry(lambda x: x,
3297 mode="r")
3298
3299 @unittest.skipUnless(threading, 'Threading required for this test.')
3300 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3301 """Check that a buffered write, when it gets interrupted (either
3302 returning a partial result or EINTR), properly invokes the signal
3303 handler and retries if the latter returned successfully."""
3304 select = support.import_module("select")
3305 # A quantity that exceeds the buffer size of an anonymous pipe's
3306 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003307 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003308 r, w = os.pipe()
3309 fdopen_kwargs["closefd"] = False
3310 # We need a separate thread to read from the pipe and allow the
3311 # write() to finish. This thread is started after the SIGALRM is
3312 # received (forcing a first EINTR in write()).
3313 read_results = []
3314 write_finished = False
3315 def _read():
3316 while not write_finished:
3317 while r in select.select([r], [], [], 1.0)[0]:
3318 s = os.read(r, 1024)
3319 read_results.append(s)
3320 t = threading.Thread(target=_read)
3321 t.daemon = True
3322 def alarm1(sig, frame):
3323 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003324 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003325 def alarm2(sig, frame):
3326 t.start()
3327 signal.signal(signal.SIGALRM, alarm1)
3328 try:
3329 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003330 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003331 # Expected behaviour:
3332 # - first raw write() is partial (because of the limited pipe buffer
3333 # and the first alarm)
3334 # - second raw write() returns EINTR (because of the second alarm)
3335 # - subsequent write()s are successful (either partial or complete)
3336 self.assertEqual(N, wio.write(item * N))
3337 wio.flush()
3338 write_finished = True
3339 t.join()
3340 self.assertEqual(N, sum(len(x) for x in read_results))
3341 finally:
3342 write_finished = True
3343 os.close(w)
3344 os.close(r)
3345 # This is deliberate. If we didn't close the file descriptor
3346 # before closing wio, wio would try to flush its internal
3347 # buffer, and could block (in case of failure).
3348 try:
3349 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003350 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003351 if e.errno != errno.EBADF:
3352 raise
3353
Antoine Pitrou20db5112011-08-19 20:32:34 +02003354 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003355 self.check_interrupted_write_retry(b"x", mode="wb")
3356
Antoine Pitrou20db5112011-08-19 20:32:34 +02003357 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003358 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3359
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003360
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003361class CSignalsTest(SignalsTest):
3362 io = io
3363
3364class PySignalsTest(SignalsTest):
3365 io = pyio
3366
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003367 # Handling reentrancy issues would slow down _pyio even more, so the
3368 # tests are disabled.
3369 test_reentrant_write_buffered = None
3370 test_reentrant_write_text = None
3371
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003372
Ezio Melottidaa42c72013-03-23 16:30:16 +02003373def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003374 tests = (CIOTest, PyIOTest,
3375 CBufferedReaderTest, PyBufferedReaderTest,
3376 CBufferedWriterTest, PyBufferedWriterTest,
3377 CBufferedRWPairTest, PyBufferedRWPairTest,
3378 CBufferedRandomTest, PyBufferedRandomTest,
3379 StatefulIncrementalDecoderTest,
3380 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3381 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003382 CMiscIOTest, PyMiscIOTest,
3383 CSignalsTest, PySignalsTest,
3384 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003385
3386 # Put the namespaces of the IO module we are testing and some useful mock
3387 # classes in the __dict__ of each test.
3388 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003389 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003390 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3391 c_io_ns = {name : getattr(io, name) for name in all_members}
3392 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3393 globs = globals()
3394 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3395 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3396 # Avoid turning open into a bound method.
3397 py_io_ns["open"] = pyio.OpenWrapper
3398 for test in tests:
3399 if test.__name__.startswith("C"):
3400 for name, obj in c_io_ns.items():
3401 setattr(test, name, obj)
3402 elif test.__name__.startswith("Py"):
3403 for name, obj in py_io_ns.items():
3404 setattr(test, name, obj)
3405
Ezio Melottidaa42c72013-03-23 16:30:16 +02003406 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3407 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003408
3409if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003410 unittest.main()