blob: ef1e05622bb95c4854290b3121a268d1f95e80d3 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200658
659 def test_IOBase_finalize(self):
660 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
661 # class which inherits IOBase and an object of this class are caught
662 # in a reference cycle and close() is already in the method cache.
663 class MyIO(self.IOBase):
664 def close(self):
665 pass
666
667 # create an instance to populate the method cache
668 MyIO()
669 obj = MyIO()
670 obj.obj = obj
671 wr = weakref.ref(obj)
672 del MyIO
673 del obj
674 support.gc_collect()
675 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677class PyIOTest(IOTest):
678 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000679
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class CommonBufferedTests:
682 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
683
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000684 def test_detach(self):
685 raw = self.MockRawIO()
686 buf = self.tp(raw)
687 self.assertIs(buf.detach(), raw)
688 self.assertRaises(ValueError, buf.detach)
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melottib3aedd42010-11-20 19:04:17 +0000694 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695
Zachary Ware9fe6d862013-12-08 00:20:35 -0600696 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200707 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super().__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super().close()
724 def flush(self):
725 record.append(3)
726 super().flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000731 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000762 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000763
Antoine Pitrou716c4442009-05-23 19:04:03 +0000764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
773
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774 def test_flush_error_on_close(self):
775 raw = self.MockRawIO()
776 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200777 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 raw.flush = bad_flush
779 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200780 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600781 self.assertTrue(b.closed)
782
783 def test_close_error_on_close(self):
784 raw = self.MockRawIO()
785 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200786 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600787 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600789 raw.close = bad_close
790 b = self.tp(raw)
791 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 b.close()
794 self.assertEqual(err.exception.args, ('close',))
795 self.assertEqual(err.exception.__context__.args, ('flush',))
796 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797
798 def test_multi_close(self):
799 raw = self.MockRawIO()
800 b = self.tp(raw)
801 b.close()
802 b.close()
803 b.close()
804 self.assertRaises(ValueError, b.flush)
805
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000806 def test_unseekable(self):
807 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
808 self.assertRaises(self.UnsupportedOperation, bufio.tell)
809 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
810
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000811 def test_readonly_attributes(self):
812 raw = self.MockRawIO()
813 buf = self.tp(raw)
814 x = self.MockRawIO()
815 with self.assertRaises(AttributeError):
816 buf.raw = x
817
Guido van Rossum78892e42007-04-06 17:31:18 +0000818
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200819class SizeofTest:
820
821 @support.cpython_only
822 def test_sizeof(self):
823 bufsize1 = 4096
824 bufsize2 = 8192
825 rawio = self.MockRawIO()
826 bufio = self.tp(rawio, buffer_size=bufsize1)
827 size = sys.getsizeof(bufio) - bufsize1
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize2)
830 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
831
Jesus Ceadc469452012-10-04 12:37:56 +0200832 @support.cpython_only
833 def test_buffer_freeing(self) :
834 bufsize = 4096
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio, buffer_size=bufsize)
837 size = sys.getsizeof(bufio) - bufsize
838 bufio.close()
839 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
842 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_constructor(self):
845 rawio = self.MockRawIO([b"abc"])
846 bufio = self.tp(rawio)
847 bufio.__init__(rawio)
848 bufio.__init__(rawio, buffer_size=1024)
849 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000850 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
854 rawio = self.MockRawIO([b"abc"])
855 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000857
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200858 def test_uninitialized(self):
859 bufio = self.tp.__new__(self.tp)
860 del bufio
861 bufio = self.tp.__new__(self.tp)
862 self.assertRaisesRegex((ValueError, AttributeError),
863 'uninitialized|has no attribute',
864 bufio.read, 0)
865 bufio.__init__(self.MockRawIO())
866 self.assertEqual(bufio.read(0), b'')
867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000869 for arg in (None, 7):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000872 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 # Invalid args
874 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_read1(self):
877 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
878 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"a", bufio.read(1))
880 self.assertEqual(b"b", bufio.read1(1))
881 self.assertEqual(rawio._reads, 1)
882 self.assertEqual(b"c", bufio.read1(100))
883 self.assertEqual(rawio._reads, 1)
884 self.assertEqual(b"d", bufio.read1(100))
885 self.assertEqual(rawio._reads, 2)
886 self.assertEqual(b"efg", bufio.read1(100))
887 self.assertEqual(rawio._reads, 3)
888 self.assertEqual(b"", bufio.read1(100))
889 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 # Invalid args
891 self.assertRaises(ValueError, bufio.read1, -1)
892
893 def test_readinto(self):
894 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
895 bufio = self.tp(rawio)
896 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000897 self.assertEqual(bufio.readinto(b), 2)
898 self.assertEqual(b, b"ab")
899 self.assertEqual(bufio.readinto(b), 2)
900 self.assertEqual(b, b"cd")
901 self.assertEqual(bufio.readinto(b), 2)
902 self.assertEqual(b, b"ef")
903 self.assertEqual(bufio.readinto(b), 1)
904 self.assertEqual(b, b"gf")
905 self.assertEqual(bufio.readinto(b), 0)
906 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200907 rawio = self.MockRawIO((b"abc", None))
908 bufio = self.tp(rawio)
909 self.assertEqual(bufio.readinto(b), 2)
910 self.assertEqual(b, b"ab")
911 self.assertEqual(bufio.readinto(b), 1)
912 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000914 def test_readlines(self):
915 def bufio():
916 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
917 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000918 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
919 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
920 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000923 data = b"abcdefghi"
924 dlen = len(data)
925
926 tests = [
927 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
928 [ 100, [ 3, 3, 3], [ dlen ] ],
929 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
930 ]
931
932 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 rawio = self.MockFileIO(data)
934 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000935 pos = 0
936 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000938 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000940 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000943 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000944 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
945 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcd", bufio.read(6))
947 self.assertEqual(b"e", bufio.read(1))
948 self.assertEqual(b"fg", bufio.read())
949 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200950 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000951 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000952
Victor Stinnera80987f2011-05-25 22:47:16 +0200953 rawio = self.MockRawIO((b"a", None, None))
954 self.assertEqual(b"a", rawio.readall())
955 self.assertIsNone(rawio.readall())
956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read_past_eof(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000963 def test_read_all(self):
964 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
965 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000966
Ezio Melottib3aedd42010-11-20 19:04:17 +0000967 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000968
Victor Stinner45df8202010-04-28 22:31:17 +0000969 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000970 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 try:
973 # Write out many bytes with exactly the same number of 0's,
974 # 1's... 255's. This will help us check that concurrent reading
975 # doesn't duplicate or forget contents.
976 N = 1000
977 l = list(range(256)) * N
978 random.shuffle(l)
979 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000980 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000981 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000982 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000984 errors = []
985 results = []
986 def f():
987 try:
988 # Intra-buffer read then buffer-flushing read
989 for n in cycle([1, 19]):
990 s = bufio.read(n)
991 if not s:
992 break
993 # list.append() is atomic
994 results.append(s)
995 except Exception as e:
996 errors.append(e)
997 raise
998 threads = [threading.Thread(target=f) for x in range(20)]
999 for t in threads:
1000 t.start()
1001 time.sleep(0.02) # yield
1002 for t in threads:
1003 t.join()
1004 self.assertFalse(errors,
1005 "the following exceptions were caught: %r" % errors)
1006 s = b''.join(results)
1007 for i in range(256):
1008 c = bytes(bytearray([i]))
1009 self.assertEqual(s.count(c), N)
1010 finally:
1011 support.unlink(support.TESTFN)
1012
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001013 def test_unseekable(self):
1014 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1015 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1016 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1017 bufio.read(1)
1018 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1019 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 def test_misbehaved_io(self):
1022 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1023 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001024 self.assertRaises(OSError, bufio.seek, 0)
1025 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001027 def test_no_extraneous_read(self):
1028 # Issue #9550; when the raw IO object has satisfied the read request,
1029 # we should not issue any additional reads, otherwise it may block
1030 # (e.g. socket).
1031 bufsize = 16
1032 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1033 rawio = self.MockRawIO([b"x" * n])
1034 bufio = self.tp(rawio, bufsize)
1035 self.assertEqual(bufio.read(n), b"x" * n)
1036 # Simple case: one raw read is enough to satisfy the request.
1037 self.assertEqual(rawio._extraneous_reads, 0,
1038 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1039 # A more complex case where two raw reads are needed to satisfy
1040 # the request.
1041 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1042 bufio = self.tp(rawio, bufsize)
1043 self.assertEqual(bufio.read(n), b"x" * n)
1044 self.assertEqual(rawio._extraneous_reads, 0,
1045 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1046
1047
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001048class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 tp = io.BufferedReader
1050
1051 def test_constructor(self):
1052 BufferedReaderTest.test_constructor(self)
1053 # The allocation can succeed on 32-bit builds, e.g. with more
1054 # than 2GB RAM and a 64-bit kernel.
1055 if sys.maxsize > 0x7FFFFFFF:
1056 rawio = self.MockRawIO()
1057 bufio = self.tp(rawio)
1058 self.assertRaises((OverflowError, MemoryError, ValueError),
1059 bufio.__init__, rawio, sys.maxsize)
1060
1061 def test_initialization(self):
1062 rawio = self.MockRawIO([b"abc"])
1063 bufio = self.tp(rawio)
1064 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1065 self.assertRaises(ValueError, bufio.read)
1066 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1067 self.assertRaises(ValueError, bufio.read)
1068 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1069 self.assertRaises(ValueError, bufio.read)
1070
1071 def test_misbehaved_io_read(self):
1072 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1073 bufio = self.tp(rawio)
1074 # _pyio.BufferedReader seems to implement reading different, so that
1075 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001076 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077
1078 def test_garbage_collection(self):
1079 # C BufferedReader objects are collected.
1080 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001081 with support.check_warnings(('', ResourceWarning)):
1082 rawio = self.FileIO(support.TESTFN, "w+b")
1083 f = self.tp(rawio)
1084 f.f = f
1085 wr = weakref.ref(f)
1086 del f
1087 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089
R David Murray67bfe802013-02-23 21:51:05 -05001090 def test_args_error(self):
1091 # Issue #17275
1092 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1093 self.tp(io.BytesIO(), 1024, 1024, 1024)
1094
1095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096class PyBufferedReaderTest(BufferedReaderTest):
1097 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001098
Guido van Rossuma9e20242007-03-08 00:43:48 +00001099
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1101 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_constructor(self):
1104 rawio = self.MockRawIO()
1105 bufio = self.tp(rawio)
1106 bufio.__init__(rawio)
1107 bufio.__init__(rawio, buffer_size=1024)
1108 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 bufio.flush()
1111 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1112 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1113 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1114 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001115 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001119 def test_uninitialized(self):
1120 bufio = self.tp.__new__(self.tp)
1121 del bufio
1122 bufio = self.tp.__new__(self.tp)
1123 self.assertRaisesRegex((ValueError, AttributeError),
1124 'uninitialized|has no attribute',
1125 bufio.write, b'')
1126 bufio.__init__(self.MockRawIO())
1127 self.assertEqual(bufio.write(b''), 0)
1128
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001129 def test_detach_flush(self):
1130 raw = self.MockRawIO()
1131 buf = self.tp(raw)
1132 buf.write(b"howdy!")
1133 self.assertFalse(raw._write_stack)
1134 buf.detach()
1135 self.assertEqual(raw._write_stack, [b"howdy!"])
1136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001138 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001141 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001142 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 def test_write_overflow(self):
1145 writer = self.MockRawIO()
1146 bufio = self.tp(writer, 8)
1147 contents = b"abcdefghijklmnop"
1148 for n in range(0, len(contents), 3):
1149 bufio.write(contents[n:n+3])
1150 flushed = b"".join(writer._write_stack)
1151 # At least (total - 8) bytes were implicitly flushed, perhaps more
1152 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001153 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 def check_writes(self, intermediate_func):
1156 # Lots of writes, test the flushed output is as expected.
1157 contents = bytes(range(256)) * 1000
1158 n = 0
1159 writer = self.MockRawIO()
1160 bufio = self.tp(writer, 13)
1161 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1162 def gen_sizes():
1163 for size in count(1):
1164 for i in range(15):
1165 yield size
1166 sizes = gen_sizes()
1167 while n < len(contents):
1168 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001169 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 intermediate_func(bufio)
1171 n += size
1172 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001173 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_writes(self):
1176 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 def test_writes_and_flushes(self):
1179 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 def test_writes_and_seeks(self):
1182 def _seekabs(bufio):
1183 pos = bufio.tell()
1184 bufio.seek(pos + 1, 0)
1185 bufio.seek(pos - 1, 0)
1186 bufio.seek(pos, 0)
1187 self.check_writes(_seekabs)
1188 def _seekrel(bufio):
1189 pos = bufio.seek(0, 1)
1190 bufio.seek(+1, 1)
1191 bufio.seek(-1, 1)
1192 bufio.seek(pos, 0)
1193 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 def test_writes_and_truncates(self):
1196 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 def test_write_non_blocking(self):
1199 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001200 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001201
Ezio Melottib3aedd42010-11-20 19:04:17 +00001202 self.assertEqual(bufio.write(b"abcd"), 4)
1203 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 # 1 byte will be written, the rest will be buffered
1205 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1209 raw.block_on(b"0")
1210 try:
1211 bufio.write(b"opqrwxyz0123456789")
1212 except self.BlockingIOError as e:
1213 written = e.characters_written
1214 else:
1215 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001216 self.assertEqual(written, 16)
1217 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001219
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 s = raw.pop_written()
1222 # Previously buffered bytes were flushed
1223 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 def test_write_and_rewind(self):
1226 raw = io.BytesIO()
1227 bufio = self.tp(raw, 4)
1228 self.assertEqual(bufio.write(b"abcdef"), 6)
1229 self.assertEqual(bufio.tell(), 6)
1230 bufio.seek(0, 0)
1231 self.assertEqual(bufio.write(b"XY"), 2)
1232 bufio.seek(6, 0)
1233 self.assertEqual(raw.getvalue(), b"XYcdef")
1234 self.assertEqual(bufio.write(b"123456"), 6)
1235 bufio.flush()
1236 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238 def test_flush(self):
1239 writer = self.MockRawIO()
1240 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001241 bufio.write(b"abc")
1242 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001243 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001244
Antoine Pitrou131a4892012-10-16 22:57:11 +02001245 def test_writelines(self):
1246 l = [b'ab', b'cd', b'ef']
1247 writer = self.MockRawIO()
1248 bufio = self.tp(writer, 8)
1249 bufio.writelines(l)
1250 bufio.flush()
1251 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1252
1253 def test_writelines_userlist(self):
1254 l = UserList([b'ab', b'cd', b'ef'])
1255 writer = self.MockRawIO()
1256 bufio = self.tp(writer, 8)
1257 bufio.writelines(l)
1258 bufio.flush()
1259 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1260
1261 def test_writelines_error(self):
1262 writer = self.MockRawIO()
1263 bufio = self.tp(writer, 8)
1264 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1265 self.assertRaises(TypeError, bufio.writelines, None)
1266 self.assertRaises(TypeError, bufio.writelines, 'abc')
1267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_destructor(self):
1269 writer = self.MockRawIO()
1270 bufio = self.tp(writer, 8)
1271 bufio.write(b"abc")
1272 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001273 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275
1276 def test_truncate(self):
1277 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001278 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 bufio = self.tp(raw, 8)
1280 bufio.write(b"abcdef")
1281 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001282 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001283 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 self.assertEqual(f.read(), b"abc")
1285
Victor Stinner45df8202010-04-28 22:31:17 +00001286 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001287 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001289 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 # Write out many bytes from many threads and test they were
1291 # all flushed.
1292 N = 1000
1293 contents = bytes(range(256)) * N
1294 sizes = cycle([1, 19])
1295 n = 0
1296 queue = deque()
1297 while n < len(contents):
1298 size = next(sizes)
1299 queue.append(contents[n:n+size])
1300 n += size
1301 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001302 # We use a real file object because it allows us to
1303 # exercise situations where the GIL is released before
1304 # writing the buffer to the raw streams. This is in addition
1305 # to concurrency issues due to switching threads in the middle
1306 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001307 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001309 errors = []
1310 def f():
1311 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 while True:
1313 try:
1314 s = queue.popleft()
1315 except IndexError:
1316 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001317 bufio.write(s)
1318 except Exception as e:
1319 errors.append(e)
1320 raise
1321 threads = [threading.Thread(target=f) for x in range(20)]
1322 for t in threads:
1323 t.start()
1324 time.sleep(0.02) # yield
1325 for t in threads:
1326 t.join()
1327 self.assertFalse(errors,
1328 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001330 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 s = f.read()
1332 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001333 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001334 finally:
1335 support.unlink(support.TESTFN)
1336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337 def test_misbehaved_io(self):
1338 rawio = self.MisbehavedRawIO()
1339 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001340 self.assertRaises(OSError, bufio.seek, 0)
1341 self.assertRaises(OSError, bufio.tell)
1342 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343
Florent Xicluna109d5732012-07-07 17:03:22 +02001344 def test_max_buffer_size_removal(self):
1345 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001346 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001347
Benjamin Peterson68623612012-12-20 11:53:11 -06001348 def test_write_error_on_close(self):
1349 raw = self.MockRawIO()
1350 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001351 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001352 raw.write = bad_write
1353 b = self.tp(raw)
1354 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001355 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001356 self.assertTrue(b.closed)
1357
Benjamin Peterson59406a92009-03-26 17:10:29 +00001358
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001359class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 tp = io.BufferedWriter
1361
1362 def test_constructor(self):
1363 BufferedWriterTest.test_constructor(self)
1364 # The allocation can succeed on 32-bit builds, e.g. with more
1365 # than 2GB RAM and a 64-bit kernel.
1366 if sys.maxsize > 0x7FFFFFFF:
1367 rawio = self.MockRawIO()
1368 bufio = self.tp(rawio)
1369 self.assertRaises((OverflowError, MemoryError, ValueError),
1370 bufio.__init__, rawio, sys.maxsize)
1371
1372 def test_initialization(self):
1373 rawio = self.MockRawIO()
1374 bufio = self.tp(rawio)
1375 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1376 self.assertRaises(ValueError, bufio.write, b"def")
1377 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1378 self.assertRaises(ValueError, bufio.write, b"def")
1379 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1380 self.assertRaises(ValueError, bufio.write, b"def")
1381
1382 def test_garbage_collection(self):
1383 # C BufferedWriter objects are collected, and collecting them flushes
1384 # all data to disk.
1385 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001386 with support.check_warnings(('', ResourceWarning)):
1387 rawio = self.FileIO(support.TESTFN, "w+b")
1388 f = self.tp(rawio)
1389 f.write(b"123xxx")
1390 f.x = f
1391 wr = weakref.ref(f)
1392 del f
1393 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001394 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001395 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396 self.assertEqual(f.read(), b"123xxx")
1397
R David Murray67bfe802013-02-23 21:51:05 -05001398 def test_args_error(self):
1399 # Issue #17275
1400 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1401 self.tp(io.BytesIO(), 1024, 1024, 1024)
1402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403
1404class PyBufferedWriterTest(BufferedWriterTest):
1405 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001406
Guido van Rossum01a27522007-03-07 01:00:12 +00001407class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001408
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001409 def test_constructor(self):
1410 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001411 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001412
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001413 def test_uninitialized(self):
1414 pair = self.tp.__new__(self.tp)
1415 del pair
1416 pair = self.tp.__new__(self.tp)
1417 self.assertRaisesRegex((ValueError, AttributeError),
1418 'uninitialized|has no attribute',
1419 pair.read, 0)
1420 self.assertRaisesRegex((ValueError, AttributeError),
1421 'uninitialized|has no attribute',
1422 pair.write, b'')
1423 pair.__init__(self.MockRawIO(), self.MockRawIO())
1424 self.assertEqual(pair.read(0), b'')
1425 self.assertEqual(pair.write(b''), 0)
1426
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001427 def test_detach(self):
1428 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1429 self.assertRaises(self.UnsupportedOperation, pair.detach)
1430
Florent Xicluna109d5732012-07-07 17:03:22 +02001431 def test_constructor_max_buffer_size_removal(self):
1432 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001433 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001434
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001435 def test_constructor_with_not_readable(self):
1436 class NotReadable(MockRawIO):
1437 def readable(self):
1438 return False
1439
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001440 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001441
1442 def test_constructor_with_not_writeable(self):
1443 class NotWriteable(MockRawIO):
1444 def writable(self):
1445 return False
1446
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001447 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001448
1449 def test_read(self):
1450 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1451
1452 self.assertEqual(pair.read(3), b"abc")
1453 self.assertEqual(pair.read(1), b"d")
1454 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001455 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1456 self.assertEqual(pair.read(None), b"abc")
1457
1458 def test_readlines(self):
1459 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1460 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1461 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1462 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001463
1464 def test_read1(self):
1465 # .read1() is delegated to the underlying reader object, so this test
1466 # can be shallow.
1467 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1468
1469 self.assertEqual(pair.read1(3), b"abc")
1470
1471 def test_readinto(self):
1472 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1473
1474 data = bytearray(5)
1475 self.assertEqual(pair.readinto(data), 5)
1476 self.assertEqual(data, b"abcde")
1477
1478 def test_write(self):
1479 w = self.MockRawIO()
1480 pair = self.tp(self.MockRawIO(), w)
1481
1482 pair.write(b"abc")
1483 pair.flush()
1484 pair.write(b"def")
1485 pair.flush()
1486 self.assertEqual(w._write_stack, [b"abc", b"def"])
1487
1488 def test_peek(self):
1489 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1490
1491 self.assertTrue(pair.peek(3).startswith(b"abc"))
1492 self.assertEqual(pair.read(3), b"abc")
1493
1494 def test_readable(self):
1495 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1496 self.assertTrue(pair.readable())
1497
1498 def test_writeable(self):
1499 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1500 self.assertTrue(pair.writable())
1501
1502 def test_seekable(self):
1503 # BufferedRWPairs are never seekable, even if their readers and writers
1504 # are.
1505 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1506 self.assertFalse(pair.seekable())
1507
1508 # .flush() is delegated to the underlying writer object and has been
1509 # tested in the test_write method.
1510
1511 def test_close_and_closed(self):
1512 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1513 self.assertFalse(pair.closed)
1514 pair.close()
1515 self.assertTrue(pair.closed)
1516
1517 def test_isatty(self):
1518 class SelectableIsAtty(MockRawIO):
1519 def __init__(self, isatty):
1520 MockRawIO.__init__(self)
1521 self._isatty = isatty
1522
1523 def isatty(self):
1524 return self._isatty
1525
1526 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1527 self.assertFalse(pair.isatty())
1528
1529 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1530 self.assertTrue(pair.isatty())
1531
1532 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1533 self.assertTrue(pair.isatty())
1534
1535 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1536 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538class CBufferedRWPairTest(BufferedRWPairTest):
1539 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541class PyBufferedRWPairTest(BufferedRWPairTest):
1542 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544
1545class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1546 read_mode = "rb+"
1547 write_mode = "wb+"
1548
1549 def test_constructor(self):
1550 BufferedReaderTest.test_constructor(self)
1551 BufferedWriterTest.test_constructor(self)
1552
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001553 def test_uninitialized(self):
1554 BufferedReaderTest.test_uninitialized(self)
1555 BufferedWriterTest.test_uninitialized(self)
1556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557 def test_read_and_write(self):
1558 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001559 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001560
1561 self.assertEqual(b"as", rw.read(2))
1562 rw.write(b"ddd")
1563 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001564 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001566 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001568 def test_seek_and_tell(self):
1569 raw = self.BytesIO(b"asdfghjkl")
1570 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001571
Ezio Melottib3aedd42010-11-20 19:04:17 +00001572 self.assertEqual(b"as", rw.read(2))
1573 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001574 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001575 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001576
Antoine Pitroue05565e2011-08-20 14:39:23 +02001577 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001578 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001579 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001580 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001581 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001582 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001583 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(7, rw.tell())
1585 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001586 rw.flush()
1587 self.assertEqual(b"asdf123fl", raw.getvalue())
1588
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001589 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 def check_flush_and_read(self, read_func):
1592 raw = self.BytesIO(b"abcdefghi")
1593 bufio = self.tp(raw)
1594
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(b"ef", read_func(bufio, 2))
1598 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001600 self.assertEqual(6, bufio.tell())
1601 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 raw.seek(0, 0)
1603 raw.write(b"XYZ")
1604 # flush() resets the read buffer
1605 bufio.flush()
1606 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001607 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001608
1609 def test_flush_and_read(self):
1610 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1611
1612 def test_flush_and_readinto(self):
1613 def _readinto(bufio, n=-1):
1614 b = bytearray(n if n >= 0 else 9999)
1615 n = bufio.readinto(b)
1616 return bytes(b[:n])
1617 self.check_flush_and_read(_readinto)
1618
1619 def test_flush_and_peek(self):
1620 def _peek(bufio, n=-1):
1621 # This relies on the fact that the buffer can contain the whole
1622 # raw stream, otherwise peek() can return less.
1623 b = bufio.peek(n)
1624 if n != -1:
1625 b = b[:n]
1626 bufio.seek(len(b), 1)
1627 return b
1628 self.check_flush_and_read(_peek)
1629
1630 def test_flush_and_write(self):
1631 raw = self.BytesIO(b"abcdefghi")
1632 bufio = self.tp(raw)
1633
1634 bufio.write(b"123")
1635 bufio.flush()
1636 bufio.write(b"45")
1637 bufio.flush()
1638 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001639 self.assertEqual(b"12345fghi", raw.getvalue())
1640 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641
1642 def test_threads(self):
1643 BufferedReaderTest.test_threads(self)
1644 BufferedWriterTest.test_threads(self)
1645
1646 def test_writes_and_peek(self):
1647 def _peek(bufio):
1648 bufio.peek(1)
1649 self.check_writes(_peek)
1650 def _peek(bufio):
1651 pos = bufio.tell()
1652 bufio.seek(-1, 1)
1653 bufio.peek(1)
1654 bufio.seek(pos, 0)
1655 self.check_writes(_peek)
1656
1657 def test_writes_and_reads(self):
1658 def _read(bufio):
1659 bufio.seek(-1, 1)
1660 bufio.read(1)
1661 self.check_writes(_read)
1662
1663 def test_writes_and_read1s(self):
1664 def _read1(bufio):
1665 bufio.seek(-1, 1)
1666 bufio.read1(1)
1667 self.check_writes(_read1)
1668
1669 def test_writes_and_readintos(self):
1670 def _read(bufio):
1671 bufio.seek(-1, 1)
1672 bufio.readinto(bytearray(1))
1673 self.check_writes(_read)
1674
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001675 def test_write_after_readahead(self):
1676 # Issue #6629: writing after the buffer was filled by readahead should
1677 # first rewind the raw stream.
1678 for overwrite_size in [1, 5]:
1679 raw = self.BytesIO(b"A" * 10)
1680 bufio = self.tp(raw, 4)
1681 # Trigger readahead
1682 self.assertEqual(bufio.read(1), b"A")
1683 self.assertEqual(bufio.tell(), 1)
1684 # Overwriting should rewind the raw stream if it needs so
1685 bufio.write(b"B" * overwrite_size)
1686 self.assertEqual(bufio.tell(), overwrite_size + 1)
1687 # If the write size was smaller than the buffer size, flush() and
1688 # check that rewind happens.
1689 bufio.flush()
1690 self.assertEqual(bufio.tell(), overwrite_size + 1)
1691 s = raw.getvalue()
1692 self.assertEqual(s,
1693 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1694
Antoine Pitrou7c404892011-05-13 00:13:33 +02001695 def test_write_rewind_write(self):
1696 # Various combinations of reading / writing / seeking backwards / writing again
1697 def mutate(bufio, pos1, pos2):
1698 assert pos2 >= pos1
1699 # Fill the buffer
1700 bufio.seek(pos1)
1701 bufio.read(pos2 - pos1)
1702 bufio.write(b'\x02')
1703 # This writes earlier than the previous write, but still inside
1704 # the buffer.
1705 bufio.seek(pos1)
1706 bufio.write(b'\x01')
1707
1708 b = b"\x80\x81\x82\x83\x84"
1709 for i in range(0, len(b)):
1710 for j in range(i, len(b)):
1711 raw = self.BytesIO(b)
1712 bufio = self.tp(raw, 100)
1713 mutate(bufio, i, j)
1714 bufio.flush()
1715 expected = bytearray(b)
1716 expected[j] = 2
1717 expected[i] = 1
1718 self.assertEqual(raw.getvalue(), expected,
1719 "failed result for i=%d, j=%d" % (i, j))
1720
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001721 def test_truncate_after_read_or_write(self):
1722 raw = self.BytesIO(b"A" * 10)
1723 bufio = self.tp(raw, 100)
1724 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1725 self.assertEqual(bufio.truncate(), 2)
1726 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1727 self.assertEqual(bufio.truncate(), 4)
1728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 def test_misbehaved_io(self):
1730 BufferedReaderTest.test_misbehaved_io(self)
1731 BufferedWriterTest.test_misbehaved_io(self)
1732
Antoine Pitroue05565e2011-08-20 14:39:23 +02001733 def test_interleaved_read_write(self):
1734 # Test for issue #12213
1735 with self.BytesIO(b'abcdefgh') as raw:
1736 with self.tp(raw, 100) as f:
1737 f.write(b"1")
1738 self.assertEqual(f.read(1), b'b')
1739 f.write(b'2')
1740 self.assertEqual(f.read1(1), b'd')
1741 f.write(b'3')
1742 buf = bytearray(1)
1743 f.readinto(buf)
1744 self.assertEqual(buf, b'f')
1745 f.write(b'4')
1746 self.assertEqual(f.peek(1), b'h')
1747 f.flush()
1748 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1749
1750 with self.BytesIO(b'abc') as raw:
1751 with self.tp(raw, 100) as f:
1752 self.assertEqual(f.read(1), b'a')
1753 f.write(b"2")
1754 self.assertEqual(f.read(1), b'c')
1755 f.flush()
1756 self.assertEqual(raw.getvalue(), b'a2c')
1757
1758 def test_interleaved_readline_write(self):
1759 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1760 with self.tp(raw) as f:
1761 f.write(b'1')
1762 self.assertEqual(f.readline(), b'b\n')
1763 f.write(b'2')
1764 self.assertEqual(f.readline(), b'def\n')
1765 f.write(b'3')
1766 self.assertEqual(f.readline(), b'\n')
1767 f.flush()
1768 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1769
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001770 # You can't construct a BufferedRandom over a non-seekable stream.
1771 test_unseekable = None
1772
R David Murray67bfe802013-02-23 21:51:05 -05001773
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001774class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 tp = io.BufferedRandom
1776
1777 def test_constructor(self):
1778 BufferedRandomTest.test_constructor(self)
1779 # The allocation can succeed on 32-bit builds, e.g. with more
1780 # than 2GB RAM and a 64-bit kernel.
1781 if sys.maxsize > 0x7FFFFFFF:
1782 rawio = self.MockRawIO()
1783 bufio = self.tp(rawio)
1784 self.assertRaises((OverflowError, MemoryError, ValueError),
1785 bufio.__init__, rawio, sys.maxsize)
1786
1787 def test_garbage_collection(self):
1788 CBufferedReaderTest.test_garbage_collection(self)
1789 CBufferedWriterTest.test_garbage_collection(self)
1790
R David Murray67bfe802013-02-23 21:51:05 -05001791 def test_args_error(self):
1792 # Issue #17275
1793 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1794 self.tp(io.BytesIO(), 1024, 1024, 1024)
1795
1796
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797class PyBufferedRandomTest(BufferedRandomTest):
1798 tp = pyio.BufferedRandom
1799
1800
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001801# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1802# properties:
1803# - A single output character can correspond to many bytes of input.
1804# - The number of input bytes to complete the character can be
1805# undetermined until the last input byte is received.
1806# - The number of input bytes can vary depending on previous input.
1807# - A single input byte can correspond to many characters of output.
1808# - The number of output characters can be undetermined until the
1809# last input byte is received.
1810# - The number of output characters can vary depending on previous input.
1811
1812class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1813 """
1814 For testing seek/tell behavior with a stateful, buffering decoder.
1815
1816 Input is a sequence of words. Words may be fixed-length (length set
1817 by input) or variable-length (period-terminated). In variable-length
1818 mode, extra periods are ignored. Possible words are:
1819 - 'i' followed by a number sets the input length, I (maximum 99).
1820 When I is set to 0, words are space-terminated.
1821 - 'o' followed by a number sets the output length, O (maximum 99).
1822 - Any other word is converted into a word followed by a period on
1823 the output. The output word consists of the input word truncated
1824 or padded out with hyphens to make its length equal to O. If O
1825 is 0, the word is output verbatim without truncating or padding.
1826 I and O are initially set to 1. When I changes, any buffered input is
1827 re-scanned according to the new I. EOF also terminates the last word.
1828 """
1829
1830 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001831 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001832 self.reset()
1833
1834 def __repr__(self):
1835 return '<SID %x>' % id(self)
1836
1837 def reset(self):
1838 self.i = 1
1839 self.o = 1
1840 self.buffer = bytearray()
1841
1842 def getstate(self):
1843 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1844 return bytes(self.buffer), i*100 + o
1845
1846 def setstate(self, state):
1847 buffer, io = state
1848 self.buffer = bytearray(buffer)
1849 i, o = divmod(io, 100)
1850 self.i, self.o = i ^ 1, o ^ 1
1851
1852 def decode(self, input, final=False):
1853 output = ''
1854 for b in input:
1855 if self.i == 0: # variable-length, terminated with period
1856 if b == ord('.'):
1857 if self.buffer:
1858 output += self.process_word()
1859 else:
1860 self.buffer.append(b)
1861 else: # fixed-length, terminate after self.i bytes
1862 self.buffer.append(b)
1863 if len(self.buffer) == self.i:
1864 output += self.process_word()
1865 if final and self.buffer: # EOF terminates the last word
1866 output += self.process_word()
1867 return output
1868
1869 def process_word(self):
1870 output = ''
1871 if self.buffer[0] == ord('i'):
1872 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1873 elif self.buffer[0] == ord('o'):
1874 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1875 else:
1876 output = self.buffer.decode('ascii')
1877 if len(output) < self.o:
1878 output += '-'*self.o # pad out with hyphens
1879 if self.o:
1880 output = output[:self.o] # truncate to output length
1881 output += '.'
1882 self.buffer = bytearray()
1883 return output
1884
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001885 codecEnabled = False
1886
1887 @classmethod
1888 def lookupTestDecoder(cls, name):
1889 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001890 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001891 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001892 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001893 incrementalencoder=None,
1894 streamreader=None, streamwriter=None,
1895 incrementaldecoder=cls)
1896
1897# Register the previous decoder for testing.
1898# Disabled by default, tests will enable it.
1899codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1900
1901
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902class StatefulIncrementalDecoderTest(unittest.TestCase):
1903 """
1904 Make sure the StatefulIncrementalDecoder actually works.
1905 """
1906
1907 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001908 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001909 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001910 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001911 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001912 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001913 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001914 # I=0, O=6 (variable-length input, fixed-length output)
1915 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1916 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001917 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001918 # I=6, O=3 (fixed-length input > fixed-length output)
1919 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1920 # I=0, then 3; O=29, then 15 (with longer output)
1921 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1922 'a----------------------------.' +
1923 'b----------------------------.' +
1924 'cde--------------------------.' +
1925 'abcdefghijabcde.' +
1926 'a.b------------.' +
1927 '.c.------------.' +
1928 'd.e------------.' +
1929 'k--------------.' +
1930 'l--------------.' +
1931 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001932 ]
1933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001935 # Try a few one-shot test cases.
1936 for input, eof, output in self.test_cases:
1937 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001938 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001939
1940 # Also test an unfinished decode, followed by forcing EOF.
1941 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(d.decode(b'oiabcd'), '')
1943 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001944
1945class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001947 def setUp(self):
1948 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1949 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001950 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001951
Guido van Rossumd0712812007-04-11 16:32:43 +00001952 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001953 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 def test_constructor(self):
1956 r = self.BytesIO(b"\xc3\xa9\n\n")
1957 b = self.BufferedReader(r, 1000)
1958 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001959 t.__init__(b, encoding="latin-1", newline="\r\n")
1960 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001961 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001962 t.__init__(b, encoding="utf-8", line_buffering=True)
1963 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001964 self.assertEqual(t.line_buffering, True)
1965 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 self.assertRaises(TypeError, t.__init__, b, newline=42)
1967 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1968
Nick Coghlana9b15242014-02-04 22:11:18 +10001969 def test_non_text_encoding_codecs_are_rejected(self):
1970 # Ensure the constructor complains if passed a codec that isn't
1971 # marked as a text encoding
1972 # http://bugs.python.org/issue20404
1973 r = self.BytesIO()
1974 b = self.BufferedWriter(r)
1975 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
1976 self.TextIOWrapper(b, encoding="hex")
1977
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001978 def test_detach(self):
1979 r = self.BytesIO()
1980 b = self.BufferedWriter(r)
1981 t = self.TextIOWrapper(b)
1982 self.assertIs(t.detach(), b)
1983
1984 t = self.TextIOWrapper(b, encoding="ascii")
1985 t.write("howdy")
1986 self.assertFalse(r.getvalue())
1987 t.detach()
1988 self.assertEqual(r.getvalue(), b"howdy")
1989 self.assertRaises(ValueError, t.detach)
1990
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001991 def test_repr(self):
1992 raw = self.BytesIO("hello".encode("utf-8"))
1993 b = self.BufferedReader(raw)
1994 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001995 modname = self.TextIOWrapper.__module__
1996 self.assertEqual(repr(t),
1997 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1998 raw.name = "dummy"
1999 self.assertEqual(repr(t),
2000 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002001 t.mode = "r"
2002 self.assertEqual(repr(t),
2003 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002004 raw.name = b"dummy"
2005 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002006 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_line_buffering(self):
2009 r = self.BytesIO()
2010 b = self.BufferedWriter(r, 1000)
2011 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002012 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002014 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002015 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002016 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002018
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002019 def test_default_encoding(self):
2020 old_environ = dict(os.environ)
2021 try:
2022 # try to get a user preferred encoding different than the current
2023 # locale encoding to check that TextIOWrapper() uses the current
2024 # locale encoding and not the user preferred encoding
2025 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2026 if key in os.environ:
2027 del os.environ[key]
2028
2029 current_locale_encoding = locale.getpreferredencoding(False)
2030 b = self.BytesIO()
2031 t = self.TextIOWrapper(b)
2032 self.assertEqual(t.encoding, current_locale_encoding)
2033 finally:
2034 os.environ.clear()
2035 os.environ.update(old_environ)
2036
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002037 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002038 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002039 # Issue 15989
2040 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002041 b = self.BytesIO()
2042 b.fileno = lambda: _testcapi.INT_MAX + 1
2043 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2044 b.fileno = lambda: _testcapi.UINT_MAX + 1
2045 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 def test_encoding(self):
2048 # Check the encoding attribute is always set, and valid
2049 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002050 t = self.TextIOWrapper(b, encoding="utf-8")
2051 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002053 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 codecs.lookup(t.encoding)
2055
2056 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002057 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 b = self.BytesIO(b"abc\n\xff\n")
2059 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002060 self.assertRaises(UnicodeError, t.read)
2061 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 b = self.BytesIO(b"abc\n\xff\n")
2063 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002064 self.assertRaises(UnicodeError, t.read)
2065 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066 b = self.BytesIO(b"abc\n\xff\n")
2067 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002068 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002069 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070 b = self.BytesIO(b"abc\n\xff\n")
2071 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002075 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 b = self.BytesIO()
2077 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002078 self.assertRaises(UnicodeError, t.write, "\xff")
2079 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 b = self.BytesIO()
2081 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002082 self.assertRaises(UnicodeError, t.write, "\xff")
2083 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 b = self.BytesIO()
2085 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002086 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002087 t.write("abc\xffdef\n")
2088 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002090 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 b = self.BytesIO()
2092 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002093 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002094 t.write("abc\xffdef\n")
2095 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002099 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2100
2101 tests = [
2102 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002103 [ '', input_lines ],
2104 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2105 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2106 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002107 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002108 encodings = (
2109 'utf-8', 'latin-1',
2110 'utf-16', 'utf-16-le', 'utf-16-be',
2111 'utf-32', 'utf-32-le', 'utf-32-be',
2112 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002113
Guido van Rossum8358db22007-08-18 21:39:55 +00002114 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002115 # character in TextIOWrapper._pending_line.
2116 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002117 # XXX: str.encode() should return bytes
2118 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002119 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002120 for bufsize in range(1, 10):
2121 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2123 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002124 encoding=encoding)
2125 if do_reads:
2126 got_lines = []
2127 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002128 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002129 if c2 == '':
2130 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002132 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002133 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002134 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002135
2136 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002137 self.assertEqual(got_line, exp_line)
2138 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 def test_newlines_input(self):
2141 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002142 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2143 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002144 (None, normalized.decode("ascii").splitlines(keepends=True)),
2145 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2147 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2148 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002149 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002150 buf = self.BytesIO(testdata)
2151 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002153 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002154 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 def test_newlines_output(self):
2157 testdict = {
2158 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2159 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2160 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2161 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2162 }
2163 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2164 for newline, expected in tests:
2165 buf = self.BytesIO()
2166 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2167 txt.write("AAA\nB")
2168 txt.write("BB\nCCC\n")
2169 txt.write("X\rY\r\nZ")
2170 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(buf.closed, False)
2172 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173
2174 def test_destructor(self):
2175 l = []
2176 base = self.BytesIO
2177 class MyBytesIO(base):
2178 def close(self):
2179 l.append(self.getvalue())
2180 base.close(self)
2181 b = MyBytesIO()
2182 t = self.TextIOWrapper(b, encoding="ascii")
2183 t.write("abc")
2184 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002185 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187
2188 def test_override_destructor(self):
2189 record = []
2190 class MyTextIO(self.TextIOWrapper):
2191 def __del__(self):
2192 record.append(1)
2193 try:
2194 f = super().__del__
2195 except AttributeError:
2196 pass
2197 else:
2198 f()
2199 def close(self):
2200 record.append(2)
2201 super().close()
2202 def flush(self):
2203 record.append(3)
2204 super().flush()
2205 b = self.BytesIO()
2206 t = MyTextIO(b, encoding="ascii")
2207 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002208 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002209 self.assertEqual(record, [1, 2, 3])
2210
2211 def test_error_through_destructor(self):
2212 # Test that the exception state is not modified by a destructor,
2213 # even if close() fails.
2214 rawio = self.CloseFailureIO()
2215 def f():
2216 self.TextIOWrapper(rawio).xyzzy
2217 with support.captured_output("stderr") as s:
2218 self.assertRaises(AttributeError, f)
2219 s = s.getvalue().strip()
2220 if s:
2221 # The destructor *may* have printed an unraisable error, check it
2222 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002223 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002224 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002225
Guido van Rossum9b76da62007-04-11 01:09:03 +00002226 # Systematic tests of the text I/O API
2227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002229 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002230 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002232 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002234 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002236 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(f.tell(), 0)
2238 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002239 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(f.seek(0), 0)
2241 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002242 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002243 self.assertEqual(f.read(2), "ab")
2244 self.assertEqual(f.read(1), "c")
2245 self.assertEqual(f.read(1), "")
2246 self.assertEqual(f.read(), "")
2247 self.assertEqual(f.tell(), cookie)
2248 self.assertEqual(f.seek(0), 0)
2249 self.assertEqual(f.seek(0, 2), cookie)
2250 self.assertEqual(f.write("def"), 3)
2251 self.assertEqual(f.seek(cookie), cookie)
2252 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002253 if enc.startswith("utf"):
2254 self.multi_line_test(f, enc)
2255 f.close()
2256
2257 def multi_line_test(self, f, enc):
2258 f.seek(0)
2259 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002260 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002261 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002262 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002263 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002264 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002265 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002266 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002267 wlines.append((f.tell(), line))
2268 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002269 f.seek(0)
2270 rlines = []
2271 while True:
2272 pos = f.tell()
2273 line = f.readline()
2274 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002275 break
2276 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002280 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002281 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002282 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002283 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002284 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002285 p2 = f.tell()
2286 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002287 self.assertEqual(f.tell(), p0)
2288 self.assertEqual(f.readline(), "\xff\n")
2289 self.assertEqual(f.tell(), p1)
2290 self.assertEqual(f.readline(), "\xff\n")
2291 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002292 f.seek(0)
2293 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002294 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002295 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002296 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002297 f.close()
2298
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 def test_seeking(self):
2300 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002301 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002302 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002303 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002304 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002305 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002306 suffix = bytes(u_suffix.encode("utf-8"))
2307 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002308 with self.open(support.TESTFN, "wb") as f:
2309 f.write(line*2)
2310 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2311 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(s, str(prefix, "ascii"))
2313 self.assertEqual(f.tell(), prefix_size)
2314 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002315
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002316 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002317 # Regression test for a specific bug
2318 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002319 with self.open(support.TESTFN, "wb") as f:
2320 f.write(data)
2321 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2322 f._CHUNK_SIZE # Just test that it exists
2323 f._CHUNK_SIZE = 2
2324 f.readline()
2325 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def test_seek_and_tell(self):
2328 #Test seek/tell using the StatefulIncrementalDecoder.
2329 # Make test faster by doing smaller seeks
2330 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002332 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002333 """Tell/seek to various points within a data stream and ensure
2334 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002336 f.write(data)
2337 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 f = self.open(support.TESTFN, encoding='test_decoder')
2339 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002340 decoded = f.read()
2341 f.close()
2342
Neal Norwitze2b07052008-03-18 19:52:05 +00002343 for i in range(min_pos, len(decoded) + 1): # seek positions
2344 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002346 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002347 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002348 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002351 f.close()
2352
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002353 # Enable the test decoder.
2354 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002355
2356 # Run the tests.
2357 try:
2358 # Try each test case.
2359 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002360 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002361
2362 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002363 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2364 offset = CHUNK_SIZE - len(input)//2
2365 prefix = b'.'*offset
2366 # Don't bother seeking into the prefix (takes too long).
2367 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002368 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002369
2370 # Ensure our test decoder won't interfere with subsequent tests.
2371 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002372 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002375 data = "1234567890"
2376 tests = ("utf-16",
2377 "utf-16-le",
2378 "utf-16-be",
2379 "utf-32",
2380 "utf-32-le",
2381 "utf-32-be")
2382 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 buf = self.BytesIO()
2384 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002385 # Check if the BOM is written only once (see issue1753).
2386 f.write(data)
2387 f.write(data)
2388 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002390 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002391 self.assertEqual(f.read(), data * 2)
2392 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002393
Benjamin Petersona1b49012009-03-31 23:11:32 +00002394 def test_unreadable(self):
2395 class UnReadable(self.BytesIO):
2396 def readable(self):
2397 return False
2398 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002399 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401 def test_read_one_by_one(self):
2402 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002403 reads = ""
2404 while True:
2405 c = txt.read(1)
2406 if not c:
2407 break
2408 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002410
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002411 def test_readlines(self):
2412 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2413 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2414 txt.seek(0)
2415 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2416 txt.seek(0)
2417 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2418
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002419 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002421 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002423 reads = ""
2424 while True:
2425 c = txt.read(128)
2426 if not c:
2427 break
2428 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002430
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002431 def test_writelines(self):
2432 l = ['ab', 'cd', 'ef']
2433 buf = self.BytesIO()
2434 txt = self.TextIOWrapper(buf)
2435 txt.writelines(l)
2436 txt.flush()
2437 self.assertEqual(buf.getvalue(), b'abcdef')
2438
2439 def test_writelines_userlist(self):
2440 l = UserList(['ab', 'cd', 'ef'])
2441 buf = self.BytesIO()
2442 txt = self.TextIOWrapper(buf)
2443 txt.writelines(l)
2444 txt.flush()
2445 self.assertEqual(buf.getvalue(), b'abcdef')
2446
2447 def test_writelines_error(self):
2448 txt = self.TextIOWrapper(self.BytesIO())
2449 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2450 self.assertRaises(TypeError, txt.writelines, None)
2451 self.assertRaises(TypeError, txt.writelines, b'abc')
2452
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002455
2456 # read one char at a time
2457 reads = ""
2458 while True:
2459 c = txt.read(1)
2460 if not c:
2461 break
2462 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002463 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002464
2465 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002467 txt._CHUNK_SIZE = 4
2468
2469 reads = ""
2470 while True:
2471 c = txt.read(4)
2472 if not c:
2473 break
2474 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002476
2477 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002479 txt._CHUNK_SIZE = 4
2480
2481 reads = txt.read(4)
2482 reads += txt.read(4)
2483 reads += txt.readline()
2484 reads += txt.readline()
2485 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002486 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002487
2488 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002489 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002490 txt._CHUNK_SIZE = 4
2491
2492 reads = txt.read(4)
2493 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002494 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002495
2496 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002498 txt._CHUNK_SIZE = 4
2499
2500 reads = txt.read(4)
2501 pos = txt.tell()
2502 txt.seek(0)
2503 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002506 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 buffer = self.BytesIO(self.testdata)
2508 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002509
2510 self.assertEqual(buffer.seekable(), txt.seekable())
2511
Antoine Pitroue4501852009-05-14 18:55:55 +00002512 def test_append_bom(self):
2513 # The BOM is not written again when appending to a non-empty file
2514 filename = support.TESTFN
2515 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2516 with self.open(filename, 'w', encoding=charset) as f:
2517 f.write('aaa')
2518 pos = f.tell()
2519 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002521
2522 with self.open(filename, 'a', encoding=charset) as f:
2523 f.write('xxx')
2524 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002525 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002526
2527 def test_seek_bom(self):
2528 # Same test, but when seeking manually
2529 filename = support.TESTFN
2530 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2531 with self.open(filename, 'w', encoding=charset) as f:
2532 f.write('aaa')
2533 pos = f.tell()
2534 with self.open(filename, 'r+', encoding=charset) as f:
2535 f.seek(pos)
2536 f.write('zzz')
2537 f.seek(0)
2538 f.write('bbb')
2539 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002541
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002542 def test_errors_property(self):
2543 with self.open(support.TESTFN, "w") as f:
2544 self.assertEqual(f.errors, "strict")
2545 with self.open(support.TESTFN, "w", errors="replace") as f:
2546 self.assertEqual(f.errors, "replace")
2547
Brett Cannon31f59292011-02-21 19:29:56 +00002548 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002549 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002550 def test_threads_write(self):
2551 # Issue6750: concurrent writes could duplicate data
2552 event = threading.Event()
2553 with self.open(support.TESTFN, "w", buffering=1) as f:
2554 def run(n):
2555 text = "Thread%03d\n" % n
2556 event.wait()
2557 f.write(text)
2558 threads = [threading.Thread(target=lambda n=x: run(n))
2559 for x in range(20)]
2560 for t in threads:
2561 t.start()
2562 time.sleep(0.02)
2563 event.set()
2564 for t in threads:
2565 t.join()
2566 with self.open(support.TESTFN) as f:
2567 content = f.read()
2568 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002570
Antoine Pitrou6be88762010-05-03 16:48:20 +00002571 def test_flush_error_on_close(self):
2572 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2573 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002574 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002575 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002576 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002577 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002578
2579 def test_multi_close(self):
2580 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2581 txt.close()
2582 txt.close()
2583 txt.close()
2584 self.assertRaises(ValueError, txt.flush)
2585
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002586 def test_unseekable(self):
2587 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2588 self.assertRaises(self.UnsupportedOperation, txt.tell)
2589 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2590
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002591 def test_readonly_attributes(self):
2592 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2593 buf = self.BytesIO(self.testdata)
2594 with self.assertRaises(AttributeError):
2595 txt.buffer = buf
2596
Antoine Pitroue96ec682011-07-23 21:46:35 +02002597 def test_rawio(self):
2598 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2599 # that subprocess.Popen() can have the required unbuffered
2600 # semantics with universal_newlines=True.
2601 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2602 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2603 # Reads
2604 self.assertEqual(txt.read(4), 'abcd')
2605 self.assertEqual(txt.readline(), 'efghi\n')
2606 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2607
2608 def test_rawio_write_through(self):
2609 # Issue #12591: with write_through=True, writes don't need a flush
2610 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2611 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2612 write_through=True)
2613 txt.write('1')
2614 txt.write('23\n4')
2615 txt.write('5')
2616 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2617
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002618 def test_bufio_write_through(self):
2619 # Issue #21396: write_through=True doesn't force a flush()
2620 # on the underlying binary buffered object.
2621 flush_called, write_called = [], []
2622 class BufferedWriter(self.BufferedWriter):
2623 def flush(self, *args, **kwargs):
2624 flush_called.append(True)
2625 return super().flush(*args, **kwargs)
2626 def write(self, *args, **kwargs):
2627 write_called.append(True)
2628 return super().write(*args, **kwargs)
2629
2630 rawio = self.BytesIO()
2631 data = b"a"
2632 bufio = BufferedWriter(rawio, len(data)*2)
2633 textio = self.TextIOWrapper(bufio, encoding='ascii',
2634 write_through=True)
2635 # write to the buffered io but don't overflow the buffer
2636 text = data.decode('ascii')
2637 textio.write(text)
2638
2639 # buffer.flush is not called with write_through=True
2640 self.assertFalse(flush_called)
2641 # buffer.write *is* called with write_through=True
2642 self.assertTrue(write_called)
2643 self.assertEqual(rawio.getvalue(), b"") # no flush
2644
2645 write_called = [] # reset
2646 textio.write(text * 10) # total content is larger than bufio buffer
2647 self.assertTrue(write_called)
2648 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2649
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002650 def test_read_nonbytes(self):
2651 # Issue #17106
2652 # Crash when underlying read() returns non-bytes
2653 t = self.TextIOWrapper(self.StringIO('a'))
2654 self.assertRaises(TypeError, t.read, 1)
2655 t = self.TextIOWrapper(self.StringIO('a'))
2656 self.assertRaises(TypeError, t.readline)
2657 t = self.TextIOWrapper(self.StringIO('a'))
2658 self.assertRaises(TypeError, t.read)
2659
2660 def test_illegal_decoder(self):
2661 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002662 # Bypass the early encoding check added in issue 20404
2663 def _make_illegal_wrapper():
2664 quopri = codecs.lookup("quopri")
2665 quopri._is_text_encoding = True
2666 try:
2667 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2668 newline='\n', encoding="quopri")
2669 finally:
2670 quopri._is_text_encoding = False
2671 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002672 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002673 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002674 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002675 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002676 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002677 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002678 self.assertRaises(TypeError, t.read)
2679
Antoine Pitrou712cb732013-12-21 15:51:54 +01002680 def _check_create_at_shutdown(self, **kwargs):
2681 # Issue #20037: creating a TextIOWrapper at shutdown
2682 # shouldn't crash the interpreter.
2683 iomod = self.io.__name__
2684 code = """if 1:
2685 import codecs
2686 import {iomod} as io
2687
2688 # Avoid looking up codecs at shutdown
2689 codecs.lookup('utf-8')
2690
2691 class C:
2692 def __init__(self):
2693 self.buf = io.BytesIO()
2694 def __del__(self):
2695 io.TextIOWrapper(self.buf, **{kwargs})
2696 print("ok")
2697 c = C()
2698 """.format(iomod=iomod, kwargs=kwargs)
2699 return assert_python_ok("-c", code)
2700
2701 def test_create_at_shutdown_without_encoding(self):
2702 rc, out, err = self._check_create_at_shutdown()
2703 if err:
2704 # Can error out with a RuntimeError if the module state
2705 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002706 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002707 else:
2708 self.assertEqual("ok", out.decode().strip())
2709
2710 def test_create_at_shutdown_with_encoding(self):
2711 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2712 errors='strict')
2713 self.assertFalse(err)
2714 self.assertEqual("ok", out.decode().strip())
2715
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002717class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002718 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002719 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002720
2721 def test_initialization(self):
2722 r = self.BytesIO(b"\xc3\xa9\n\n")
2723 b = self.BufferedReader(r, 1000)
2724 t = self.TextIOWrapper(b)
2725 self.assertRaises(TypeError, t.__init__, b, newline=42)
2726 self.assertRaises(ValueError, t.read)
2727 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2728 self.assertRaises(ValueError, t.read)
2729
2730 def test_garbage_collection(self):
2731 # C TextIOWrapper objects are collected, and collecting them flushes
2732 # all data to disk.
2733 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002734 with support.check_warnings(('', ResourceWarning)):
2735 rawio = io.FileIO(support.TESTFN, "wb")
2736 b = self.BufferedWriter(rawio)
2737 t = self.TextIOWrapper(b, encoding="ascii")
2738 t.write("456def")
2739 t.x = t
2740 wr = weakref.ref(t)
2741 del t
2742 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002743 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002744 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002745 self.assertEqual(f.read(), b"456def")
2746
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002747 def test_rwpair_cleared_before_textio(self):
2748 # Issue 13070: TextIOWrapper's finalization would crash when called
2749 # after the reference to the underlying BufferedRWPair's writer got
2750 # cleared by the GC.
2751 for i in range(1000):
2752 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2753 t1 = self.TextIOWrapper(b1, encoding="ascii")
2754 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2755 t2 = self.TextIOWrapper(b2, encoding="ascii")
2756 # circular references
2757 t1.buddy = t2
2758 t2.buddy = t1
2759 support.gc_collect()
2760
2761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002762class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002763 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002764 #shutdown_error = "LookupError: unknown encoding: ascii"
2765 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766
2767
2768class IncrementalNewlineDecoderTest(unittest.TestCase):
2769
2770 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002771 # UTF-8 specific tests for a newline decoder
2772 def _check_decode(b, s, **kwargs):
2773 # We exercise getstate() / setstate() as well as decode()
2774 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002775 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002776 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002777 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002778
Antoine Pitrou180a3362008-12-14 16:36:46 +00002779 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002780
Antoine Pitrou180a3362008-12-14 16:36:46 +00002781 _check_decode(b'\xe8', "")
2782 _check_decode(b'\xa2', "")
2783 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002784
Antoine Pitrou180a3362008-12-14 16:36:46 +00002785 _check_decode(b'\xe8', "")
2786 _check_decode(b'\xa2', "")
2787 _check_decode(b'\x88', "\u8888")
2788
2789 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002790 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2791
Antoine Pitrou180a3362008-12-14 16:36:46 +00002792 decoder.reset()
2793 _check_decode(b'\n', "\n")
2794 _check_decode(b'\r', "")
2795 _check_decode(b'', "\n", final=True)
2796 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002797
Antoine Pitrou180a3362008-12-14 16:36:46 +00002798 _check_decode(b'\r', "")
2799 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002800
Antoine Pitrou180a3362008-12-14 16:36:46 +00002801 _check_decode(b'\r\r\n', "\n\n")
2802 _check_decode(b'\r', "")
2803 _check_decode(b'\r', "\n")
2804 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002805
Antoine Pitrou180a3362008-12-14 16:36:46 +00002806 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2807 _check_decode(b'\xe8\xa2\x88', "\u8888")
2808 _check_decode(b'\n', "\n")
2809 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2810 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002812 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002813 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 if encoding is not None:
2815 encoder = codecs.getincrementalencoder(encoding)()
2816 def _decode_bytewise(s):
2817 # Decode one byte at a time
2818 for b in encoder.encode(s):
2819 result.append(decoder.decode(bytes([b])))
2820 else:
2821 encoder = None
2822 def _decode_bytewise(s):
2823 # Decode one char at a time
2824 for c in s:
2825 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002827 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002829 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002831 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002832 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002833 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002834 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002835 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002836 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002837 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002838 input = "abc"
2839 if encoder is not None:
2840 encoder.reset()
2841 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002842 self.assertEqual(decoder.decode(input), "abc")
2843 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002844
2845 def test_newline_decoder(self):
2846 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 # None meaning the IncrementalNewlineDecoder takes unicode input
2848 # rather than bytes input
2849 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002850 'utf-16', 'utf-16-le', 'utf-16-be',
2851 'utf-32', 'utf-32-le', 'utf-32-be',
2852 )
2853 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002854 decoder = enc and codecs.getincrementaldecoder(enc)()
2855 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2856 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002857 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002858 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2859 self.check_newline_decoding_utf8(decoder)
2860
Antoine Pitrou66913e22009-03-06 23:40:56 +00002861 def test_newline_bytes(self):
2862 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2863 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002864 self.assertEqual(dec.newlines, None)
2865 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2866 self.assertEqual(dec.newlines, None)
2867 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2868 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002869 dec = self.IncrementalNewlineDecoder(None, translate=False)
2870 _check(dec)
2871 dec = self.IncrementalNewlineDecoder(None, translate=True)
2872 _check(dec)
2873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002874class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2875 pass
2876
2877class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2878 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002879
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002880
Guido van Rossum01a27522007-03-07 01:00:12 +00002881# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002882
Guido van Rossum5abbf752007-08-27 17:39:33 +00002883class MiscIOTest(unittest.TestCase):
2884
Barry Warsaw40e82462008-11-20 20:14:50 +00002885 def tearDown(self):
2886 support.unlink(support.TESTFN)
2887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888 def test___all__(self):
2889 for name in self.io.__all__:
2890 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002891 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002892 if name == "open":
2893 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002894 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002895 self.assertTrue(issubclass(obj, Exception), name)
2896 elif not name.startswith("SEEK_"):
2897 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002898
Barry Warsaw40e82462008-11-20 20:14:50 +00002899 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002900 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002901 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002902 f.close()
2903
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002904 with support.check_warnings(('', DeprecationWarning)):
2905 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002906 self.assertEqual(f.name, support.TESTFN)
2907 self.assertEqual(f.buffer.name, support.TESTFN)
2908 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2909 self.assertEqual(f.mode, "U")
2910 self.assertEqual(f.buffer.mode, "rb")
2911 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002912 f.close()
2913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002914 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002915 self.assertEqual(f.mode, "w+")
2916 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2917 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002919 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002920 self.assertEqual(g.mode, "wb")
2921 self.assertEqual(g.raw.mode, "wb")
2922 self.assertEqual(g.name, f.fileno())
2923 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002924 f.close()
2925 g.close()
2926
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002927 def test_io_after_close(self):
2928 for kwargs in [
2929 {"mode": "w"},
2930 {"mode": "wb"},
2931 {"mode": "w", "buffering": 1},
2932 {"mode": "w", "buffering": 2},
2933 {"mode": "wb", "buffering": 0},
2934 {"mode": "r"},
2935 {"mode": "rb"},
2936 {"mode": "r", "buffering": 1},
2937 {"mode": "r", "buffering": 2},
2938 {"mode": "rb", "buffering": 0},
2939 {"mode": "w+"},
2940 {"mode": "w+b"},
2941 {"mode": "w+", "buffering": 1},
2942 {"mode": "w+", "buffering": 2},
2943 {"mode": "w+b", "buffering": 0},
2944 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002945 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002946 f.close()
2947 self.assertRaises(ValueError, f.flush)
2948 self.assertRaises(ValueError, f.fileno)
2949 self.assertRaises(ValueError, f.isatty)
2950 self.assertRaises(ValueError, f.__iter__)
2951 if hasattr(f, "peek"):
2952 self.assertRaises(ValueError, f.peek, 1)
2953 self.assertRaises(ValueError, f.read)
2954 if hasattr(f, "read1"):
2955 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002956 if hasattr(f, "readall"):
2957 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002958 if hasattr(f, "readinto"):
2959 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2960 self.assertRaises(ValueError, f.readline)
2961 self.assertRaises(ValueError, f.readlines)
2962 self.assertRaises(ValueError, f.seek, 0)
2963 self.assertRaises(ValueError, f.tell)
2964 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965 self.assertRaises(ValueError, f.write,
2966 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002967 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002968 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002970 def test_blockingioerror(self):
2971 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 class C(str):
2973 pass
2974 c = C("")
2975 b = self.BlockingIOError(1, c)
2976 c.b = b
2977 b.c = c
2978 wr = weakref.ref(c)
2979 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002980 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002981 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002982
2983 def test_abcs(self):
2984 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002985 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2986 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2987 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2988 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989
2990 def _check_abc_inheritance(self, abcmodule):
2991 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002992 self.assertIsInstance(f, abcmodule.IOBase)
2993 self.assertIsInstance(f, abcmodule.RawIOBase)
2994 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2995 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002996 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002997 self.assertIsInstance(f, abcmodule.IOBase)
2998 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2999 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3000 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003002 self.assertIsInstance(f, abcmodule.IOBase)
3003 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3004 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3005 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003006
3007 def test_abc_inheritance(self):
3008 # Test implementations inherit from their respective ABCs
3009 self._check_abc_inheritance(self)
3010
3011 def test_abc_inheritance_official(self):
3012 # Test implementations inherit from the official ABCs of the
3013 # baseline "io" module.
3014 self._check_abc_inheritance(io)
3015
Antoine Pitroue033e062010-10-29 10:38:18 +00003016 def _check_warn_on_dealloc(self, *args, **kwargs):
3017 f = open(*args, **kwargs)
3018 r = repr(f)
3019 with self.assertWarns(ResourceWarning) as cm:
3020 f = None
3021 support.gc_collect()
3022 self.assertIn(r, str(cm.warning.args[0]))
3023
3024 def test_warn_on_dealloc(self):
3025 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3026 self._check_warn_on_dealloc(support.TESTFN, "wb")
3027 self._check_warn_on_dealloc(support.TESTFN, "w")
3028
3029 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3030 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003031 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003032 for fd in fds:
3033 try:
3034 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003035 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003036 if e.errno != errno.EBADF:
3037 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003038 self.addCleanup(cleanup_fds)
3039 r, w = os.pipe()
3040 fds += r, w
3041 self._check_warn_on_dealloc(r, *args, **kwargs)
3042 # When using closefd=False, there's no warning
3043 r, w = os.pipe()
3044 fds += r, w
3045 with warnings.catch_warnings(record=True) as recorded:
3046 open(r, *args, closefd=False, **kwargs)
3047 support.gc_collect()
3048 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003049
3050 def test_warn_on_dealloc_fd(self):
3051 self._check_warn_on_dealloc_fd("rb", buffering=0)
3052 self._check_warn_on_dealloc_fd("rb")
3053 self._check_warn_on_dealloc_fd("r")
3054
3055
Antoine Pitrou243757e2010-11-05 21:15:39 +00003056 def test_pickling(self):
3057 # Pickling file objects is forbidden
3058 for kwargs in [
3059 {"mode": "w"},
3060 {"mode": "wb"},
3061 {"mode": "wb", "buffering": 0},
3062 {"mode": "r"},
3063 {"mode": "rb"},
3064 {"mode": "rb", "buffering": 0},
3065 {"mode": "w+"},
3066 {"mode": "w+b"},
3067 {"mode": "w+b", "buffering": 0},
3068 ]:
3069 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3070 with self.open(support.TESTFN, **kwargs) as f:
3071 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3072
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003073 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3074 def test_nonblock_pipe_write_bigbuf(self):
3075 self._test_nonblock_pipe_write(16*1024)
3076
3077 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3078 def test_nonblock_pipe_write_smallbuf(self):
3079 self._test_nonblock_pipe_write(1024)
3080
3081 def _set_non_blocking(self, fd):
3082 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3083 self.assertNotEqual(flags, -1)
3084 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3085 self.assertEqual(res, 0)
3086
3087 def _test_nonblock_pipe_write(self, bufsize):
3088 sent = []
3089 received = []
3090 r, w = os.pipe()
3091 self._set_non_blocking(r)
3092 self._set_non_blocking(w)
3093
3094 # To exercise all code paths in the C implementation we need
3095 # to play with buffer sizes. For instance, if we choose a
3096 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3097 # then we will never get a partial write of the buffer.
3098 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3099 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3100
3101 with rf, wf:
3102 for N in 9999, 73, 7574:
3103 try:
3104 i = 0
3105 while True:
3106 msg = bytes([i % 26 + 97]) * N
3107 sent.append(msg)
3108 wf.write(msg)
3109 i += 1
3110
3111 except self.BlockingIOError as e:
3112 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003113 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003114 sent[-1] = sent[-1][:e.characters_written]
3115 received.append(rf.read())
3116 msg = b'BLOCKED'
3117 wf.write(msg)
3118 sent.append(msg)
3119
3120 while True:
3121 try:
3122 wf.flush()
3123 break
3124 except self.BlockingIOError as e:
3125 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003126 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003127 self.assertEqual(e.characters_written, 0)
3128 received.append(rf.read())
3129
3130 received += iter(rf.read, None)
3131
3132 sent, received = b''.join(sent), b''.join(received)
3133 self.assertTrue(sent == received)
3134 self.assertTrue(wf.closed)
3135 self.assertTrue(rf.closed)
3136
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003137 def test_create_fail(self):
3138 # 'x' mode fails if file is existing
3139 with self.open(support.TESTFN, 'w'):
3140 pass
3141 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3142
3143 def test_create_writes(self):
3144 # 'x' mode opens for writing
3145 with self.open(support.TESTFN, 'xb') as f:
3146 f.write(b"spam")
3147 with self.open(support.TESTFN, 'rb') as f:
3148 self.assertEqual(b"spam", f.read())
3149
Christian Heimes7b648752012-09-10 14:48:43 +02003150 def test_open_allargs(self):
3151 # there used to be a buffer overflow in the parser for rawmode
3152 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3153
3154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003155class CMiscIOTest(MiscIOTest):
3156 io = io
3157
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003158 def test_readinto_buffer_overflow(self):
3159 # Issue #18025
3160 class BadReader(self.io.BufferedIOBase):
3161 def read(self, n=-1):
3162 return b'x' * 10**6
3163 bufio = BadReader()
3164 b = bytearray(2)
3165 self.assertRaises(ValueError, bufio.readinto, b)
3166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003167class PyMiscIOTest(MiscIOTest):
3168 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003169
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003170
3171@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3172class SignalsTest(unittest.TestCase):
3173
3174 def setUp(self):
3175 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3176
3177 def tearDown(self):
3178 signal.signal(signal.SIGALRM, self.oldalrm)
3179
3180 def alarm_interrupt(self, sig, frame):
3181 1/0
3182
3183 @unittest.skipUnless(threading, 'Threading required for this test.')
3184 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3185 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003186 invokes the signal handler, and bubbles up the exception raised
3187 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003188 read_results = []
3189 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003190 if hasattr(signal, 'pthread_sigmask'):
3191 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003192 s = os.read(r, 1)
3193 read_results.append(s)
3194 t = threading.Thread(target=_read)
3195 t.daemon = True
3196 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003197 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003198 try:
3199 wio = self.io.open(w, **fdopen_kwargs)
3200 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003201 # Fill the pipe enough that the write will be blocking.
3202 # It will be interrupted by the timer armed above. Since the
3203 # other thread has read one byte, the low-level write will
3204 # return with a successful (partial) result rather than an EINTR.
3205 # The buffered IO layer must check for pending signal
3206 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003207 signal.alarm(1)
3208 try:
3209 self.assertRaises(ZeroDivisionError,
3210 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3211 finally:
3212 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003213 t.join()
3214 # We got one byte, get another one and check that it isn't a
3215 # repeat of the first one.
3216 read_results.append(os.read(r, 1))
3217 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3218 finally:
3219 os.close(w)
3220 os.close(r)
3221 # This is deliberate. If we didn't close the file descriptor
3222 # before closing wio, wio would try to flush its internal
3223 # buffer, and block again.
3224 try:
3225 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003226 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003227 if e.errno != errno.EBADF:
3228 raise
3229
3230 def test_interrupted_write_unbuffered(self):
3231 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3232
3233 def test_interrupted_write_buffered(self):
3234 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3235
3236 def test_interrupted_write_text(self):
3237 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3238
Brett Cannon31f59292011-02-21 19:29:56 +00003239 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003240 def check_reentrant_write(self, data, **fdopen_kwargs):
3241 def on_alarm(*args):
3242 # Will be called reentrantly from the same thread
3243 wio.write(data)
3244 1/0
3245 signal.signal(signal.SIGALRM, on_alarm)
3246 r, w = os.pipe()
3247 wio = self.io.open(w, **fdopen_kwargs)
3248 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003249 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003250 # Either the reentrant call to wio.write() fails with RuntimeError,
3251 # or the signal handler raises ZeroDivisionError.
3252 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3253 while 1:
3254 for i in range(100):
3255 wio.write(data)
3256 wio.flush()
3257 # Make sure the buffer doesn't fill up and block further writes
3258 os.read(r, len(data) * 100)
3259 exc = cm.exception
3260 if isinstance(exc, RuntimeError):
3261 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3262 finally:
3263 wio.close()
3264 os.close(r)
3265
3266 def test_reentrant_write_buffered(self):
3267 self.check_reentrant_write(b"xy", mode="wb")
3268
3269 def test_reentrant_write_text(self):
3270 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3271
Antoine Pitrou707ce822011-02-25 21:24:11 +00003272 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3273 """Check that a buffered read, when it gets interrupted (either
3274 returning a partial result or EINTR), properly invokes the signal
3275 handler and retries if the latter returned successfully."""
3276 r, w = os.pipe()
3277 fdopen_kwargs["closefd"] = False
3278 def alarm_handler(sig, frame):
3279 os.write(w, b"bar")
3280 signal.signal(signal.SIGALRM, alarm_handler)
3281 try:
3282 rio = self.io.open(r, **fdopen_kwargs)
3283 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003284 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003285 # Expected behaviour:
3286 # - first raw read() returns partial b"foo"
3287 # - second raw read() returns EINTR
3288 # - third raw read() returns b"bar"
3289 self.assertEqual(decode(rio.read(6)), "foobar")
3290 finally:
3291 rio.close()
3292 os.close(w)
3293 os.close(r)
3294
Antoine Pitrou20db5112011-08-19 20:32:34 +02003295 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003296 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3297 mode="rb")
3298
Antoine Pitrou20db5112011-08-19 20:32:34 +02003299 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003300 self.check_interrupted_read_retry(lambda x: x,
3301 mode="r")
3302
3303 @unittest.skipUnless(threading, 'Threading required for this test.')
3304 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3305 """Check that a buffered write, when it gets interrupted (either
3306 returning a partial result or EINTR), properly invokes the signal
3307 handler and retries if the latter returned successfully."""
3308 select = support.import_module("select")
3309 # A quantity that exceeds the buffer size of an anonymous pipe's
3310 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003311 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003312 r, w = os.pipe()
3313 fdopen_kwargs["closefd"] = False
3314 # We need a separate thread to read from the pipe and allow the
3315 # write() to finish. This thread is started after the SIGALRM is
3316 # received (forcing a first EINTR in write()).
3317 read_results = []
3318 write_finished = False
3319 def _read():
3320 while not write_finished:
3321 while r in select.select([r], [], [], 1.0)[0]:
3322 s = os.read(r, 1024)
3323 read_results.append(s)
3324 t = threading.Thread(target=_read)
3325 t.daemon = True
3326 def alarm1(sig, frame):
3327 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003328 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003329 def alarm2(sig, frame):
3330 t.start()
3331 signal.signal(signal.SIGALRM, alarm1)
3332 try:
3333 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003334 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003335 # Expected behaviour:
3336 # - first raw write() is partial (because of the limited pipe buffer
3337 # and the first alarm)
3338 # - second raw write() returns EINTR (because of the second alarm)
3339 # - subsequent write()s are successful (either partial or complete)
3340 self.assertEqual(N, wio.write(item * N))
3341 wio.flush()
3342 write_finished = True
3343 t.join()
3344 self.assertEqual(N, sum(len(x) for x in read_results))
3345 finally:
3346 write_finished = True
3347 os.close(w)
3348 os.close(r)
3349 # This is deliberate. If we didn't close the file descriptor
3350 # before closing wio, wio would try to flush its internal
3351 # buffer, and could block (in case of failure).
3352 try:
3353 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003354 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003355 if e.errno != errno.EBADF:
3356 raise
3357
Antoine Pitrou20db5112011-08-19 20:32:34 +02003358 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003359 self.check_interrupted_write_retry(b"x", mode="wb")
3360
Antoine Pitrou20db5112011-08-19 20:32:34 +02003361 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003362 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3363
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003364
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003365class CSignalsTest(SignalsTest):
3366 io = io
3367
3368class PySignalsTest(SignalsTest):
3369 io = pyio
3370
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003371 # Handling reentrancy issues would slow down _pyio even more, so the
3372 # tests are disabled.
3373 test_reentrant_write_buffered = None
3374 test_reentrant_write_text = None
3375
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003376
Ezio Melottidaa42c72013-03-23 16:30:16 +02003377def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003378 tests = (CIOTest, PyIOTest,
3379 CBufferedReaderTest, PyBufferedReaderTest,
3380 CBufferedWriterTest, PyBufferedWriterTest,
3381 CBufferedRWPairTest, PyBufferedRWPairTest,
3382 CBufferedRandomTest, PyBufferedRandomTest,
3383 StatefulIncrementalDecoderTest,
3384 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3385 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003386 CMiscIOTest, PyMiscIOTest,
3387 CSignalsTest, PySignalsTest,
3388 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003389
3390 # Put the namespaces of the IO module we are testing and some useful mock
3391 # classes in the __dict__ of each test.
3392 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003393 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003394 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3395 c_io_ns = {name : getattr(io, name) for name in all_members}
3396 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3397 globs = globals()
3398 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3399 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3400 # Avoid turning open into a bound method.
3401 py_io_ns["open"] = pyio.OpenWrapper
3402 for test in tests:
3403 if test.__name__.startswith("C"):
3404 for name, obj in c_io_ns.items():
3405 setattr(test, name, obj)
3406 elif test.__name__.startswith("Py"):
3407 for name, obj in py_io_ns.items():
3408 setattr(test, name, obj)
3409
Ezio Melottidaa42c72013-03-23 16:30:16 +02003410 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3411 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003412
3413if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003414 unittest.main()