blob: bda59bb3560b9b09ed868deab57e100fd0e467e3 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300656 def test_nonbuffered_textio(self):
657 with warnings.catch_warnings(record=True) as recorded:
658 with self.assertRaises(ValueError):
659 self.open(support.TESTFN, 'w', buffering=0)
660 support.gc_collect()
661 self.assertEqual(recorded, [])
662
663 def test_invalid_newline(self):
664 with warnings.catch_warnings(record=True) as recorded:
665 with self.assertRaises(ValueError):
666 self.open(support.TESTFN, 'w', newline='invalid')
667 support.gc_collect()
668 self.assertEqual(recorded, [])
669
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200672
673 def test_IOBase_finalize(self):
674 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
675 # class which inherits IOBase and an object of this class are caught
676 # in a reference cycle and close() is already in the method cache.
677 class MyIO(self.IOBase):
678 def close(self):
679 pass
680
681 # create an instance to populate the method cache
682 MyIO()
683 obj = MyIO()
684 obj.obj = obj
685 wr = weakref.ref(obj)
686 del MyIO
687 del obj
688 support.gc_collect()
689 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class PyIOTest(IOTest):
692 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695class CommonBufferedTests:
696 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
697
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000698 def test_detach(self):
699 raw = self.MockRawIO()
700 buf = self.tp(raw)
701 self.assertIs(buf.detach(), raw)
702 self.assertRaises(ValueError, buf.detach)
703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 def test_fileno(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707
Ezio Melottib3aedd42010-11-20 19:04:17 +0000708 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709
Zachary Ware9fe6d862013-12-08 00:20:35 -0600710 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_no_fileno(self):
712 # XXX will we always have fileno() function? If so, kill
713 # this test. Else, write it.
714 pass
715
716 def test_invalid_args(self):
717 rawio = self.MockRawIO()
718 bufio = self.tp(rawio)
719 # Invalid whence
720 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200721 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722
723 def test_override_destructor(self):
724 tp = self.tp
725 record = []
726 class MyBufferedIO(tp):
727 def __del__(self):
728 record.append(1)
729 try:
730 f = super().__del__
731 except AttributeError:
732 pass
733 else:
734 f()
735 def close(self):
736 record.append(2)
737 super().close()
738 def flush(self):
739 record.append(3)
740 super().flush()
741 rawio = self.MockRawIO()
742 bufio = MyBufferedIO(rawio)
743 writable = bufio.writable()
744 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000745 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 if writable:
747 self.assertEqual(record, [1, 2, 3])
748 else:
749 self.assertEqual(record, [1, 2])
750
751 def test_context_manager(self):
752 # Test usability as a context manager
753 rawio = self.MockRawIO()
754 bufio = self.tp(rawio)
755 def _with():
756 with bufio:
757 pass
758 _with()
759 # bufio should now be closed, and using it a second time should raise
760 # a ValueError.
761 self.assertRaises(ValueError, _with)
762
763 def test_error_through_destructor(self):
764 # Test that the exception state is not modified by a destructor,
765 # even if close() fails.
766 rawio = self.CloseFailureIO()
767 def f():
768 self.tp(rawio).xyzzy
769 with support.captured_output("stderr") as s:
770 self.assertRaises(AttributeError, f)
771 s = s.getvalue().strip()
772 if s:
773 # The destructor *may* have printed an unraisable error, check it
774 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200775 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000776 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000777
Antoine Pitrou716c4442009-05-23 19:04:03 +0000778 def test_repr(self):
779 raw = self.MockRawIO()
780 b = self.tp(raw)
781 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
782 self.assertEqual(repr(b), "<%s>" % clsname)
783 raw.name = "dummy"
784 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
785 raw.name = b"dummy"
786 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
787
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 def test_flush_error_on_close(self):
789 raw = self.MockRawIO()
790 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200791 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000792 raw.flush = bad_flush
793 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200794 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600795 self.assertTrue(b.closed)
796
797 def test_close_error_on_close(self):
798 raw = self.MockRawIO()
799 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200800 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600801 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 raw.close = bad_close
804 b = self.tp(raw)
805 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200806 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600807 b.close()
808 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300809 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600810 self.assertEqual(err.exception.__context__.args, ('flush',))
811 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000812
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300813 def test_nonnormalized_close_error_on_close(self):
814 # Issue #21677
815 raw = self.MockRawIO()
816 def bad_flush():
817 raise non_existing_flush
818 def bad_close():
819 raise non_existing_close
820 raw.close = bad_close
821 b = self.tp(raw)
822 b.flush = bad_flush
823 with self.assertRaises(NameError) as err: # exception not swallowed
824 b.close()
825 self.assertIn('non_existing_close', str(err.exception))
826 self.assertIsInstance(err.exception.__context__, NameError)
827 self.assertIn('non_existing_flush', str(err.exception.__context__))
828 self.assertFalse(b.closed)
829
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830 def test_multi_close(self):
831 raw = self.MockRawIO()
832 b = self.tp(raw)
833 b.close()
834 b.close()
835 b.close()
836 self.assertRaises(ValueError, b.flush)
837
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000838 def test_unseekable(self):
839 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
840 self.assertRaises(self.UnsupportedOperation, bufio.tell)
841 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
842
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000843 def test_readonly_attributes(self):
844 raw = self.MockRawIO()
845 buf = self.tp(raw)
846 x = self.MockRawIO()
847 with self.assertRaises(AttributeError):
848 buf.raw = x
849
Guido van Rossum78892e42007-04-06 17:31:18 +0000850
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200851class SizeofTest:
852
853 @support.cpython_only
854 def test_sizeof(self):
855 bufsize1 = 4096
856 bufsize2 = 8192
857 rawio = self.MockRawIO()
858 bufio = self.tp(rawio, buffer_size=bufsize1)
859 size = sys.getsizeof(bufio) - bufsize1
860 rawio = self.MockRawIO()
861 bufio = self.tp(rawio, buffer_size=bufsize2)
862 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
863
Jesus Ceadc469452012-10-04 12:37:56 +0200864 @support.cpython_only
865 def test_buffer_freeing(self) :
866 bufsize = 4096
867 rawio = self.MockRawIO()
868 bufio = self.tp(rawio, buffer_size=bufsize)
869 size = sys.getsizeof(bufio) - bufsize
870 bufio.close()
871 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
874 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_constructor(self):
877 rawio = self.MockRawIO([b"abc"])
878 bufio = self.tp(rawio)
879 bufio.__init__(rawio)
880 bufio.__init__(rawio, buffer_size=1024)
881 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000882 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
884 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
885 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
886 rawio = self.MockRawIO([b"abc"])
887 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000889
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200890 def test_uninitialized(self):
891 bufio = self.tp.__new__(self.tp)
892 del bufio
893 bufio = self.tp.__new__(self.tp)
894 self.assertRaisesRegex((ValueError, AttributeError),
895 'uninitialized|has no attribute',
896 bufio.read, 0)
897 bufio.__init__(self.MockRawIO())
898 self.assertEqual(bufio.read(0), b'')
899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000900 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000901 for arg in (None, 7):
902 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
903 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000905 # Invalid args
906 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_read1(self):
909 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
910 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000911 self.assertEqual(b"a", bufio.read(1))
912 self.assertEqual(b"b", bufio.read1(1))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"c", bufio.read1(100))
915 self.assertEqual(rawio._reads, 1)
916 self.assertEqual(b"d", bufio.read1(100))
917 self.assertEqual(rawio._reads, 2)
918 self.assertEqual(b"efg", bufio.read1(100))
919 self.assertEqual(rawio._reads, 3)
920 self.assertEqual(b"", bufio.read1(100))
921 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 # Invalid args
923 self.assertRaises(ValueError, bufio.read1, -1)
924
925 def test_readinto(self):
926 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
927 bufio = self.tp(rawio)
928 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"ab")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"cd")
933 self.assertEqual(bufio.readinto(b), 2)
934 self.assertEqual(b, b"ef")
935 self.assertEqual(bufio.readinto(b), 1)
936 self.assertEqual(b, b"gf")
937 self.assertEqual(bufio.readinto(b), 0)
938 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200939 rawio = self.MockRawIO((b"abc", None))
940 bufio = self.tp(rawio)
941 self.assertEqual(bufio.readinto(b), 2)
942 self.assertEqual(b, b"ab")
943 self.assertEqual(bufio.readinto(b), 1)
944 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945
Benjamin Petersona96fea02014-06-22 14:17:44 -0700946 def test_readinto1(self):
947 buffer_size = 10
948 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
949 bufio = self.tp(rawio, buffer_size=buffer_size)
950 b = bytearray(2)
951 self.assertEqual(bufio.peek(3), b'abc')
952 self.assertEqual(rawio._reads, 1)
953 self.assertEqual(bufio.readinto1(b), 2)
954 self.assertEqual(b, b"ab")
955 self.assertEqual(rawio._reads, 1)
956 self.assertEqual(bufio.readinto1(b), 1)
957 self.assertEqual(b[:1], b"c")
958 self.assertEqual(rawio._reads, 1)
959 self.assertEqual(bufio.readinto1(b), 2)
960 self.assertEqual(b, b"de")
961 self.assertEqual(rawio._reads, 2)
962 b = bytearray(2*buffer_size)
963 self.assertEqual(bufio.peek(3), b'fgh')
964 self.assertEqual(rawio._reads, 3)
965 self.assertEqual(bufio.readinto1(b), 6)
966 self.assertEqual(b[:6], b"fghjkl")
967 self.assertEqual(rawio._reads, 4)
968
969 def test_readinto_array(self):
970 buffer_size = 60
971 data = b"a" * 26
972 rawio = self.MockRawIO((data,))
973 bufio = self.tp(rawio, buffer_size=buffer_size)
974
975 # Create an array with element size > 1 byte
976 b = array.array('i', b'x' * 32)
977 assert len(b) != 16
978
979 # Read into it. We should get as many *bytes* as we can fit into b
980 # (which is more than the number of elements)
981 n = bufio.readinto(b)
982 self.assertGreater(n, len(b))
983
984 # Check that old contents of b are preserved
985 bm = memoryview(b).cast('B')
986 self.assertLess(n, len(bm))
987 self.assertEqual(bm[:n], data[:n])
988 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
989
990 def test_readinto1_array(self):
991 buffer_size = 60
992 data = b"a" * 26
993 rawio = self.MockRawIO((data,))
994 bufio = self.tp(rawio, buffer_size=buffer_size)
995
996 # Create an array with element size > 1 byte
997 b = array.array('i', b'x' * 32)
998 assert len(b) != 16
999
1000 # Read into it. We should get as many *bytes* as we can fit into b
1001 # (which is more than the number of elements)
1002 n = bufio.readinto1(b)
1003 self.assertGreater(n, len(b))
1004
1005 # Check that old contents of b are preserved
1006 bm = memoryview(b).cast('B')
1007 self.assertLess(n, len(bm))
1008 self.assertEqual(bm[:n], data[:n])
1009 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1010
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001011 def test_readlines(self):
1012 def bufio():
1013 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1014 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001015 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1016 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1017 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001020 data = b"abcdefghi"
1021 dlen = len(data)
1022
1023 tests = [
1024 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1025 [ 100, [ 3, 3, 3], [ dlen ] ],
1026 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1027 ]
1028
1029 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 rawio = self.MockFileIO(data)
1031 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001032 pos = 0
1033 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001034 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001035 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001037 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001040 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1042 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001043 self.assertEqual(b"abcd", bufio.read(6))
1044 self.assertEqual(b"e", bufio.read(1))
1045 self.assertEqual(b"fg", bufio.read())
1046 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001047 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001048 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001049
Victor Stinnera80987f2011-05-25 22:47:16 +02001050 rawio = self.MockRawIO((b"a", None, None))
1051 self.assertEqual(b"a", rawio.readall())
1052 self.assertIsNone(rawio.readall())
1053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_read_past_eof(self):
1055 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1056 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001057
Ezio Melottib3aedd42010-11-20 19:04:17 +00001058 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_read_all(self):
1061 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1062 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001063
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001065
Victor Stinner45df8202010-04-28 22:31:17 +00001066 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001067 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001069 try:
1070 # Write out many bytes with exactly the same number of 0's,
1071 # 1's... 255's. This will help us check that concurrent reading
1072 # doesn't duplicate or forget contents.
1073 N = 1000
1074 l = list(range(256)) * N
1075 random.shuffle(l)
1076 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001077 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001078 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001079 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001081 errors = []
1082 results = []
1083 def f():
1084 try:
1085 # Intra-buffer read then buffer-flushing read
1086 for n in cycle([1, 19]):
1087 s = bufio.read(n)
1088 if not s:
1089 break
1090 # list.append() is atomic
1091 results.append(s)
1092 except Exception as e:
1093 errors.append(e)
1094 raise
1095 threads = [threading.Thread(target=f) for x in range(20)]
1096 for t in threads:
1097 t.start()
1098 time.sleep(0.02) # yield
1099 for t in threads:
1100 t.join()
1101 self.assertFalse(errors,
1102 "the following exceptions were caught: %r" % errors)
1103 s = b''.join(results)
1104 for i in range(256):
1105 c = bytes(bytearray([i]))
1106 self.assertEqual(s.count(c), N)
1107 finally:
1108 support.unlink(support.TESTFN)
1109
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001110 def test_unseekable(self):
1111 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1112 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1113 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1114 bufio.read(1)
1115 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1116 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_misbehaved_io(self):
1119 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1120 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001121 self.assertRaises(OSError, bufio.seek, 0)
1122 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001124 def test_no_extraneous_read(self):
1125 # Issue #9550; when the raw IO object has satisfied the read request,
1126 # we should not issue any additional reads, otherwise it may block
1127 # (e.g. socket).
1128 bufsize = 16
1129 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1130 rawio = self.MockRawIO([b"x" * n])
1131 bufio = self.tp(rawio, bufsize)
1132 self.assertEqual(bufio.read(n), b"x" * n)
1133 # Simple case: one raw read is enough to satisfy the request.
1134 self.assertEqual(rawio._extraneous_reads, 0,
1135 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1136 # A more complex case where two raw reads are needed to satisfy
1137 # the request.
1138 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1139 bufio = self.tp(rawio, bufsize)
1140 self.assertEqual(bufio.read(n), b"x" * n)
1141 self.assertEqual(rawio._extraneous_reads, 0,
1142 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1143
1144
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001145class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 tp = io.BufferedReader
1147
1148 def test_constructor(self):
1149 BufferedReaderTest.test_constructor(self)
1150 # The allocation can succeed on 32-bit builds, e.g. with more
1151 # than 2GB RAM and a 64-bit kernel.
1152 if sys.maxsize > 0x7FFFFFFF:
1153 rawio = self.MockRawIO()
1154 bufio = self.tp(rawio)
1155 self.assertRaises((OverflowError, MemoryError, ValueError),
1156 bufio.__init__, rawio, sys.maxsize)
1157
1158 def test_initialization(self):
1159 rawio = self.MockRawIO([b"abc"])
1160 bufio = self.tp(rawio)
1161 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1162 self.assertRaises(ValueError, bufio.read)
1163 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1164 self.assertRaises(ValueError, bufio.read)
1165 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1166 self.assertRaises(ValueError, bufio.read)
1167
1168 def test_misbehaved_io_read(self):
1169 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1170 bufio = self.tp(rawio)
1171 # _pyio.BufferedReader seems to implement reading different, so that
1172 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001173 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174
1175 def test_garbage_collection(self):
1176 # C BufferedReader objects are collected.
1177 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001178 with support.check_warnings(('', ResourceWarning)):
1179 rawio = self.FileIO(support.TESTFN, "w+b")
1180 f = self.tp(rawio)
1181 f.f = f
1182 wr = weakref.ref(f)
1183 del f
1184 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001185 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186
R David Murray67bfe802013-02-23 21:51:05 -05001187 def test_args_error(self):
1188 # Issue #17275
1189 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1190 self.tp(io.BytesIO(), 1024, 1024, 1024)
1191
1192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193class PyBufferedReaderTest(BufferedReaderTest):
1194 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001195
Guido van Rossuma9e20242007-03-08 00:43:48 +00001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1198 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_constructor(self):
1201 rawio = self.MockRawIO()
1202 bufio = self.tp(rawio)
1203 bufio.__init__(rawio)
1204 bufio.__init__(rawio, buffer_size=1024)
1205 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 bufio.flush()
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1209 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1210 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1211 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001214 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001216 def test_uninitialized(self):
1217 bufio = self.tp.__new__(self.tp)
1218 del bufio
1219 bufio = self.tp.__new__(self.tp)
1220 self.assertRaisesRegex((ValueError, AttributeError),
1221 'uninitialized|has no attribute',
1222 bufio.write, b'')
1223 bufio.__init__(self.MockRawIO())
1224 self.assertEqual(bufio.write(b''), 0)
1225
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001226 def test_detach_flush(self):
1227 raw = self.MockRawIO()
1228 buf = self.tp(raw)
1229 buf.write(b"howdy!")
1230 self.assertFalse(raw._write_stack)
1231 buf.detach()
1232 self.assertEqual(raw._write_stack, [b"howdy!"])
1233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001234 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001235 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236 writer = self.MockRawIO()
1237 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001238 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001239 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 def test_write_overflow(self):
1242 writer = self.MockRawIO()
1243 bufio = self.tp(writer, 8)
1244 contents = b"abcdefghijklmnop"
1245 for n in range(0, len(contents), 3):
1246 bufio.write(contents[n:n+3])
1247 flushed = b"".join(writer._write_stack)
1248 # At least (total - 8) bytes were implicitly flushed, perhaps more
1249 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001250 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 def check_writes(self, intermediate_func):
1253 # Lots of writes, test the flushed output is as expected.
1254 contents = bytes(range(256)) * 1000
1255 n = 0
1256 writer = self.MockRawIO()
1257 bufio = self.tp(writer, 13)
1258 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1259 def gen_sizes():
1260 for size in count(1):
1261 for i in range(15):
1262 yield size
1263 sizes = gen_sizes()
1264 while n < len(contents):
1265 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 intermediate_func(bufio)
1268 n += size
1269 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001270 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272 def test_writes(self):
1273 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001274
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275 def test_writes_and_flushes(self):
1276 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 def test_writes_and_seeks(self):
1279 def _seekabs(bufio):
1280 pos = bufio.tell()
1281 bufio.seek(pos + 1, 0)
1282 bufio.seek(pos - 1, 0)
1283 bufio.seek(pos, 0)
1284 self.check_writes(_seekabs)
1285 def _seekrel(bufio):
1286 pos = bufio.seek(0, 1)
1287 bufio.seek(+1, 1)
1288 bufio.seek(-1, 1)
1289 bufio.seek(pos, 0)
1290 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 def test_writes_and_truncates(self):
1293 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001295 def test_write_non_blocking(self):
1296 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001297 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001298
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(bufio.write(b"abcd"), 4)
1300 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001301 # 1 byte will be written, the rest will be buffered
1302 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001303 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1306 raw.block_on(b"0")
1307 try:
1308 bufio.write(b"opqrwxyz0123456789")
1309 except self.BlockingIOError as e:
1310 written = e.characters_written
1311 else:
1312 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(written, 16)
1314 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001316
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 s = raw.pop_written()
1319 # Previously buffered bytes were flushed
1320 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 def test_write_and_rewind(self):
1323 raw = io.BytesIO()
1324 bufio = self.tp(raw, 4)
1325 self.assertEqual(bufio.write(b"abcdef"), 6)
1326 self.assertEqual(bufio.tell(), 6)
1327 bufio.seek(0, 0)
1328 self.assertEqual(bufio.write(b"XY"), 2)
1329 bufio.seek(6, 0)
1330 self.assertEqual(raw.getvalue(), b"XYcdef")
1331 self.assertEqual(bufio.write(b"123456"), 6)
1332 bufio.flush()
1333 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001335 def test_flush(self):
1336 writer = self.MockRawIO()
1337 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001338 bufio.write(b"abc")
1339 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001340 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001341
Antoine Pitrou131a4892012-10-16 22:57:11 +02001342 def test_writelines(self):
1343 l = [b'ab', b'cd', b'ef']
1344 writer = self.MockRawIO()
1345 bufio = self.tp(writer, 8)
1346 bufio.writelines(l)
1347 bufio.flush()
1348 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1349
1350 def test_writelines_userlist(self):
1351 l = UserList([b'ab', b'cd', b'ef'])
1352 writer = self.MockRawIO()
1353 bufio = self.tp(writer, 8)
1354 bufio.writelines(l)
1355 bufio.flush()
1356 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1357
1358 def test_writelines_error(self):
1359 writer = self.MockRawIO()
1360 bufio = self.tp(writer, 8)
1361 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1362 self.assertRaises(TypeError, bufio.writelines, None)
1363 self.assertRaises(TypeError, bufio.writelines, 'abc')
1364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 def test_destructor(self):
1366 writer = self.MockRawIO()
1367 bufio = self.tp(writer, 8)
1368 bufio.write(b"abc")
1369 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001370 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372
1373 def test_truncate(self):
1374 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001375 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 bufio = self.tp(raw, 8)
1377 bufio.write(b"abcdef")
1378 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001379 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001380 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001381 self.assertEqual(f.read(), b"abc")
1382
Victor Stinner45df8202010-04-28 22:31:17 +00001383 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001384 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001386 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387 # Write out many bytes from many threads and test they were
1388 # all flushed.
1389 N = 1000
1390 contents = bytes(range(256)) * N
1391 sizes = cycle([1, 19])
1392 n = 0
1393 queue = deque()
1394 while n < len(contents):
1395 size = next(sizes)
1396 queue.append(contents[n:n+size])
1397 n += size
1398 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001399 # We use a real file object because it allows us to
1400 # exercise situations where the GIL is released before
1401 # writing the buffer to the raw streams. This is in addition
1402 # to concurrency issues due to switching threads in the middle
1403 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001404 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001406 errors = []
1407 def f():
1408 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 while True:
1410 try:
1411 s = queue.popleft()
1412 except IndexError:
1413 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 bufio.write(s)
1415 except Exception as e:
1416 errors.append(e)
1417 raise
1418 threads = [threading.Thread(target=f) for x in range(20)]
1419 for t in threads:
1420 t.start()
1421 time.sleep(0.02) # yield
1422 for t in threads:
1423 t.join()
1424 self.assertFalse(errors,
1425 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001427 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 s = f.read()
1429 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001431 finally:
1432 support.unlink(support.TESTFN)
1433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 def test_misbehaved_io(self):
1435 rawio = self.MisbehavedRawIO()
1436 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001437 self.assertRaises(OSError, bufio.seek, 0)
1438 self.assertRaises(OSError, bufio.tell)
1439 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440
Florent Xicluna109d5732012-07-07 17:03:22 +02001441 def test_max_buffer_size_removal(self):
1442 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001443 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001444
Benjamin Peterson68623612012-12-20 11:53:11 -06001445 def test_write_error_on_close(self):
1446 raw = self.MockRawIO()
1447 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001448 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001449 raw.write = bad_write
1450 b = self.tp(raw)
1451 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001452 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001453 self.assertTrue(b.closed)
1454
Benjamin Peterson59406a92009-03-26 17:10:29 +00001455
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001456class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457 tp = io.BufferedWriter
1458
1459 def test_constructor(self):
1460 BufferedWriterTest.test_constructor(self)
1461 # The allocation can succeed on 32-bit builds, e.g. with more
1462 # than 2GB RAM and a 64-bit kernel.
1463 if sys.maxsize > 0x7FFFFFFF:
1464 rawio = self.MockRawIO()
1465 bufio = self.tp(rawio)
1466 self.assertRaises((OverflowError, MemoryError, ValueError),
1467 bufio.__init__, rawio, sys.maxsize)
1468
1469 def test_initialization(self):
1470 rawio = self.MockRawIO()
1471 bufio = self.tp(rawio)
1472 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1473 self.assertRaises(ValueError, bufio.write, b"def")
1474 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1475 self.assertRaises(ValueError, bufio.write, b"def")
1476 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1477 self.assertRaises(ValueError, bufio.write, b"def")
1478
1479 def test_garbage_collection(self):
1480 # C BufferedWriter objects are collected, and collecting them flushes
1481 # all data to disk.
1482 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001483 with support.check_warnings(('', ResourceWarning)):
1484 rawio = self.FileIO(support.TESTFN, "w+b")
1485 f = self.tp(rawio)
1486 f.write(b"123xxx")
1487 f.x = f
1488 wr = weakref.ref(f)
1489 del f
1490 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001491 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001492 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493 self.assertEqual(f.read(), b"123xxx")
1494
R David Murray67bfe802013-02-23 21:51:05 -05001495 def test_args_error(self):
1496 # Issue #17275
1497 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1498 self.tp(io.BytesIO(), 1024, 1024, 1024)
1499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500
1501class PyBufferedWriterTest(BufferedWriterTest):
1502 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001503
Guido van Rossum01a27522007-03-07 01:00:12 +00001504class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001505
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001506 def test_constructor(self):
1507 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001508 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001509
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001510 def test_uninitialized(self):
1511 pair = self.tp.__new__(self.tp)
1512 del pair
1513 pair = self.tp.__new__(self.tp)
1514 self.assertRaisesRegex((ValueError, AttributeError),
1515 'uninitialized|has no attribute',
1516 pair.read, 0)
1517 self.assertRaisesRegex((ValueError, AttributeError),
1518 'uninitialized|has no attribute',
1519 pair.write, b'')
1520 pair.__init__(self.MockRawIO(), self.MockRawIO())
1521 self.assertEqual(pair.read(0), b'')
1522 self.assertEqual(pair.write(b''), 0)
1523
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001524 def test_detach(self):
1525 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1526 self.assertRaises(self.UnsupportedOperation, pair.detach)
1527
Florent Xicluna109d5732012-07-07 17:03:22 +02001528 def test_constructor_max_buffer_size_removal(self):
1529 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001530 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001531
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001532 def test_constructor_with_not_readable(self):
1533 class NotReadable(MockRawIO):
1534 def readable(self):
1535 return False
1536
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001537 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001538
1539 def test_constructor_with_not_writeable(self):
1540 class NotWriteable(MockRawIO):
1541 def writable(self):
1542 return False
1543
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001544 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001545
1546 def test_read(self):
1547 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1548
1549 self.assertEqual(pair.read(3), b"abc")
1550 self.assertEqual(pair.read(1), b"d")
1551 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001552 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1553 self.assertEqual(pair.read(None), b"abc")
1554
1555 def test_readlines(self):
1556 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1557 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1558 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1559 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001560
1561 def test_read1(self):
1562 # .read1() is delegated to the underlying reader object, so this test
1563 # can be shallow.
1564 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1565
1566 self.assertEqual(pair.read1(3), b"abc")
1567
1568 def test_readinto(self):
1569 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1570
1571 data = bytearray(5)
1572 self.assertEqual(pair.readinto(data), 5)
1573 self.assertEqual(data, b"abcde")
1574
1575 def test_write(self):
1576 w = self.MockRawIO()
1577 pair = self.tp(self.MockRawIO(), w)
1578
1579 pair.write(b"abc")
1580 pair.flush()
1581 pair.write(b"def")
1582 pair.flush()
1583 self.assertEqual(w._write_stack, [b"abc", b"def"])
1584
1585 def test_peek(self):
1586 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1587
1588 self.assertTrue(pair.peek(3).startswith(b"abc"))
1589 self.assertEqual(pair.read(3), b"abc")
1590
1591 def test_readable(self):
1592 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1593 self.assertTrue(pair.readable())
1594
1595 def test_writeable(self):
1596 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1597 self.assertTrue(pair.writable())
1598
1599 def test_seekable(self):
1600 # BufferedRWPairs are never seekable, even if their readers and writers
1601 # are.
1602 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1603 self.assertFalse(pair.seekable())
1604
1605 # .flush() is delegated to the underlying writer object and has been
1606 # tested in the test_write method.
1607
1608 def test_close_and_closed(self):
1609 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1610 self.assertFalse(pair.closed)
1611 pair.close()
1612 self.assertTrue(pair.closed)
1613
1614 def test_isatty(self):
1615 class SelectableIsAtty(MockRawIO):
1616 def __init__(self, isatty):
1617 MockRawIO.__init__(self)
1618 self._isatty = isatty
1619
1620 def isatty(self):
1621 return self._isatty
1622
1623 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1624 self.assertFalse(pair.isatty())
1625
1626 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1627 self.assertTrue(pair.isatty())
1628
1629 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1630 self.assertTrue(pair.isatty())
1631
1632 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1633 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635class CBufferedRWPairTest(BufferedRWPairTest):
1636 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638class PyBufferedRWPairTest(BufferedRWPairTest):
1639 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641
1642class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1643 read_mode = "rb+"
1644 write_mode = "wb+"
1645
1646 def test_constructor(self):
1647 BufferedReaderTest.test_constructor(self)
1648 BufferedWriterTest.test_constructor(self)
1649
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001650 def test_uninitialized(self):
1651 BufferedReaderTest.test_uninitialized(self)
1652 BufferedWriterTest.test_uninitialized(self)
1653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 def test_read_and_write(self):
1655 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001656 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001657
1658 self.assertEqual(b"as", rw.read(2))
1659 rw.write(b"ddd")
1660 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001661 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001663 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_seek_and_tell(self):
1666 raw = self.BytesIO(b"asdfghjkl")
1667 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001668
Ezio Melottib3aedd42010-11-20 19:04:17 +00001669 self.assertEqual(b"as", rw.read(2))
1670 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001671 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001672 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001673
Antoine Pitroue05565e2011-08-20 14:39:23 +02001674 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001675 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001676 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001677 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001678 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001679 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001680 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001681 self.assertEqual(7, rw.tell())
1682 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001683 rw.flush()
1684 self.assertEqual(b"asdf123fl", raw.getvalue())
1685
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001686 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 def check_flush_and_read(self, read_func):
1689 raw = self.BytesIO(b"abcdefghi")
1690 bufio = self.tp(raw)
1691
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001694 self.assertEqual(b"ef", read_func(bufio, 2))
1695 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001697 self.assertEqual(6, bufio.tell())
1698 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 raw.seek(0, 0)
1700 raw.write(b"XYZ")
1701 # flush() resets the read buffer
1702 bufio.flush()
1703 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001704 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705
1706 def test_flush_and_read(self):
1707 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1708
1709 def test_flush_and_readinto(self):
1710 def _readinto(bufio, n=-1):
1711 b = bytearray(n if n >= 0 else 9999)
1712 n = bufio.readinto(b)
1713 return bytes(b[:n])
1714 self.check_flush_and_read(_readinto)
1715
1716 def test_flush_and_peek(self):
1717 def _peek(bufio, n=-1):
1718 # This relies on the fact that the buffer can contain the whole
1719 # raw stream, otherwise peek() can return less.
1720 b = bufio.peek(n)
1721 if n != -1:
1722 b = b[:n]
1723 bufio.seek(len(b), 1)
1724 return b
1725 self.check_flush_and_read(_peek)
1726
1727 def test_flush_and_write(self):
1728 raw = self.BytesIO(b"abcdefghi")
1729 bufio = self.tp(raw)
1730
1731 bufio.write(b"123")
1732 bufio.flush()
1733 bufio.write(b"45")
1734 bufio.flush()
1735 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001736 self.assertEqual(b"12345fghi", raw.getvalue())
1737 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738
1739 def test_threads(self):
1740 BufferedReaderTest.test_threads(self)
1741 BufferedWriterTest.test_threads(self)
1742
1743 def test_writes_and_peek(self):
1744 def _peek(bufio):
1745 bufio.peek(1)
1746 self.check_writes(_peek)
1747 def _peek(bufio):
1748 pos = bufio.tell()
1749 bufio.seek(-1, 1)
1750 bufio.peek(1)
1751 bufio.seek(pos, 0)
1752 self.check_writes(_peek)
1753
1754 def test_writes_and_reads(self):
1755 def _read(bufio):
1756 bufio.seek(-1, 1)
1757 bufio.read(1)
1758 self.check_writes(_read)
1759
1760 def test_writes_and_read1s(self):
1761 def _read1(bufio):
1762 bufio.seek(-1, 1)
1763 bufio.read1(1)
1764 self.check_writes(_read1)
1765
1766 def test_writes_and_readintos(self):
1767 def _read(bufio):
1768 bufio.seek(-1, 1)
1769 bufio.readinto(bytearray(1))
1770 self.check_writes(_read)
1771
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001772 def test_write_after_readahead(self):
1773 # Issue #6629: writing after the buffer was filled by readahead should
1774 # first rewind the raw stream.
1775 for overwrite_size in [1, 5]:
1776 raw = self.BytesIO(b"A" * 10)
1777 bufio = self.tp(raw, 4)
1778 # Trigger readahead
1779 self.assertEqual(bufio.read(1), b"A")
1780 self.assertEqual(bufio.tell(), 1)
1781 # Overwriting should rewind the raw stream if it needs so
1782 bufio.write(b"B" * overwrite_size)
1783 self.assertEqual(bufio.tell(), overwrite_size + 1)
1784 # If the write size was smaller than the buffer size, flush() and
1785 # check that rewind happens.
1786 bufio.flush()
1787 self.assertEqual(bufio.tell(), overwrite_size + 1)
1788 s = raw.getvalue()
1789 self.assertEqual(s,
1790 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1791
Antoine Pitrou7c404892011-05-13 00:13:33 +02001792 def test_write_rewind_write(self):
1793 # Various combinations of reading / writing / seeking backwards / writing again
1794 def mutate(bufio, pos1, pos2):
1795 assert pos2 >= pos1
1796 # Fill the buffer
1797 bufio.seek(pos1)
1798 bufio.read(pos2 - pos1)
1799 bufio.write(b'\x02')
1800 # This writes earlier than the previous write, but still inside
1801 # the buffer.
1802 bufio.seek(pos1)
1803 bufio.write(b'\x01')
1804
1805 b = b"\x80\x81\x82\x83\x84"
1806 for i in range(0, len(b)):
1807 for j in range(i, len(b)):
1808 raw = self.BytesIO(b)
1809 bufio = self.tp(raw, 100)
1810 mutate(bufio, i, j)
1811 bufio.flush()
1812 expected = bytearray(b)
1813 expected[j] = 2
1814 expected[i] = 1
1815 self.assertEqual(raw.getvalue(), expected,
1816 "failed result for i=%d, j=%d" % (i, j))
1817
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001818 def test_truncate_after_read_or_write(self):
1819 raw = self.BytesIO(b"A" * 10)
1820 bufio = self.tp(raw, 100)
1821 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1822 self.assertEqual(bufio.truncate(), 2)
1823 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1824 self.assertEqual(bufio.truncate(), 4)
1825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001826 def test_misbehaved_io(self):
1827 BufferedReaderTest.test_misbehaved_io(self)
1828 BufferedWriterTest.test_misbehaved_io(self)
1829
Antoine Pitroue05565e2011-08-20 14:39:23 +02001830 def test_interleaved_read_write(self):
1831 # Test for issue #12213
1832 with self.BytesIO(b'abcdefgh') as raw:
1833 with self.tp(raw, 100) as f:
1834 f.write(b"1")
1835 self.assertEqual(f.read(1), b'b')
1836 f.write(b'2')
1837 self.assertEqual(f.read1(1), b'd')
1838 f.write(b'3')
1839 buf = bytearray(1)
1840 f.readinto(buf)
1841 self.assertEqual(buf, b'f')
1842 f.write(b'4')
1843 self.assertEqual(f.peek(1), b'h')
1844 f.flush()
1845 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1846
1847 with self.BytesIO(b'abc') as raw:
1848 with self.tp(raw, 100) as f:
1849 self.assertEqual(f.read(1), b'a')
1850 f.write(b"2")
1851 self.assertEqual(f.read(1), b'c')
1852 f.flush()
1853 self.assertEqual(raw.getvalue(), b'a2c')
1854
1855 def test_interleaved_readline_write(self):
1856 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1857 with self.tp(raw) as f:
1858 f.write(b'1')
1859 self.assertEqual(f.readline(), b'b\n')
1860 f.write(b'2')
1861 self.assertEqual(f.readline(), b'def\n')
1862 f.write(b'3')
1863 self.assertEqual(f.readline(), b'\n')
1864 f.flush()
1865 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1866
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001867 # You can't construct a BufferedRandom over a non-seekable stream.
1868 test_unseekable = None
1869
R David Murray67bfe802013-02-23 21:51:05 -05001870
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001871class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 tp = io.BufferedRandom
1873
1874 def test_constructor(self):
1875 BufferedRandomTest.test_constructor(self)
1876 # The allocation can succeed on 32-bit builds, e.g. with more
1877 # than 2GB RAM and a 64-bit kernel.
1878 if sys.maxsize > 0x7FFFFFFF:
1879 rawio = self.MockRawIO()
1880 bufio = self.tp(rawio)
1881 self.assertRaises((OverflowError, MemoryError, ValueError),
1882 bufio.__init__, rawio, sys.maxsize)
1883
1884 def test_garbage_collection(self):
1885 CBufferedReaderTest.test_garbage_collection(self)
1886 CBufferedWriterTest.test_garbage_collection(self)
1887
R David Murray67bfe802013-02-23 21:51:05 -05001888 def test_args_error(self):
1889 # Issue #17275
1890 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1891 self.tp(io.BytesIO(), 1024, 1024, 1024)
1892
1893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894class PyBufferedRandomTest(BufferedRandomTest):
1895 tp = pyio.BufferedRandom
1896
1897
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1899# properties:
1900# - A single output character can correspond to many bytes of input.
1901# - The number of input bytes to complete the character can be
1902# undetermined until the last input byte is received.
1903# - The number of input bytes can vary depending on previous input.
1904# - A single input byte can correspond to many characters of output.
1905# - The number of output characters can be undetermined until the
1906# last input byte is received.
1907# - The number of output characters can vary depending on previous input.
1908
1909class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1910 """
1911 For testing seek/tell behavior with a stateful, buffering decoder.
1912
1913 Input is a sequence of words. Words may be fixed-length (length set
1914 by input) or variable-length (period-terminated). In variable-length
1915 mode, extra periods are ignored. Possible words are:
1916 - 'i' followed by a number sets the input length, I (maximum 99).
1917 When I is set to 0, words are space-terminated.
1918 - 'o' followed by a number sets the output length, O (maximum 99).
1919 - Any other word is converted into a word followed by a period on
1920 the output. The output word consists of the input word truncated
1921 or padded out with hyphens to make its length equal to O. If O
1922 is 0, the word is output verbatim without truncating or padding.
1923 I and O are initially set to 1. When I changes, any buffered input is
1924 re-scanned according to the new I. EOF also terminates the last word.
1925 """
1926
1927 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001928 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001929 self.reset()
1930
1931 def __repr__(self):
1932 return '<SID %x>' % id(self)
1933
1934 def reset(self):
1935 self.i = 1
1936 self.o = 1
1937 self.buffer = bytearray()
1938
1939 def getstate(self):
1940 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1941 return bytes(self.buffer), i*100 + o
1942
1943 def setstate(self, state):
1944 buffer, io = state
1945 self.buffer = bytearray(buffer)
1946 i, o = divmod(io, 100)
1947 self.i, self.o = i ^ 1, o ^ 1
1948
1949 def decode(self, input, final=False):
1950 output = ''
1951 for b in input:
1952 if self.i == 0: # variable-length, terminated with period
1953 if b == ord('.'):
1954 if self.buffer:
1955 output += self.process_word()
1956 else:
1957 self.buffer.append(b)
1958 else: # fixed-length, terminate after self.i bytes
1959 self.buffer.append(b)
1960 if len(self.buffer) == self.i:
1961 output += self.process_word()
1962 if final and self.buffer: # EOF terminates the last word
1963 output += self.process_word()
1964 return output
1965
1966 def process_word(self):
1967 output = ''
1968 if self.buffer[0] == ord('i'):
1969 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1970 elif self.buffer[0] == ord('o'):
1971 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1972 else:
1973 output = self.buffer.decode('ascii')
1974 if len(output) < self.o:
1975 output += '-'*self.o # pad out with hyphens
1976 if self.o:
1977 output = output[:self.o] # truncate to output length
1978 output += '.'
1979 self.buffer = bytearray()
1980 return output
1981
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001982 codecEnabled = False
1983
1984 @classmethod
1985 def lookupTestDecoder(cls, name):
1986 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001987 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001988 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001989 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001990 incrementalencoder=None,
1991 streamreader=None, streamwriter=None,
1992 incrementaldecoder=cls)
1993
1994# Register the previous decoder for testing.
1995# Disabled by default, tests will enable it.
1996codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1997
1998
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001999class StatefulIncrementalDecoderTest(unittest.TestCase):
2000 """
2001 Make sure the StatefulIncrementalDecoder actually works.
2002 """
2003
2004 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002005 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002006 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002007 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002008 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002009 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002011 # I=0, O=6 (variable-length input, fixed-length output)
2012 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2013 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002015 # I=6, O=3 (fixed-length input > fixed-length output)
2016 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2017 # I=0, then 3; O=29, then 15 (with longer output)
2018 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2019 'a----------------------------.' +
2020 'b----------------------------.' +
2021 'cde--------------------------.' +
2022 'abcdefghijabcde.' +
2023 'a.b------------.' +
2024 '.c.------------.' +
2025 'd.e------------.' +
2026 'k--------------.' +
2027 'l--------------.' +
2028 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002029 ]
2030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032 # Try a few one-shot test cases.
2033 for input, eof, output in self.test_cases:
2034 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002035 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002036
2037 # Also test an unfinished decode, followed by forcing EOF.
2038 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002039 self.assertEqual(d.decode(b'oiabcd'), '')
2040 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002041
2042class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002043
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044 def setUp(self):
2045 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2046 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002047 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002048
Guido van Rossumd0712812007-04-11 16:32:43 +00002049 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002050 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 def test_constructor(self):
2053 r = self.BytesIO(b"\xc3\xa9\n\n")
2054 b = self.BufferedReader(r, 1000)
2055 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002056 t.__init__(b, encoding="latin-1", newline="\r\n")
2057 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002058 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002059 t.__init__(b, encoding="utf-8", line_buffering=True)
2060 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002061 self.assertEqual(t.line_buffering, True)
2062 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 self.assertRaises(TypeError, t.__init__, b, newline=42)
2064 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2065
Nick Coghlana9b15242014-02-04 22:11:18 +10002066 def test_non_text_encoding_codecs_are_rejected(self):
2067 # Ensure the constructor complains if passed a codec that isn't
2068 # marked as a text encoding
2069 # http://bugs.python.org/issue20404
2070 r = self.BytesIO()
2071 b = self.BufferedWriter(r)
2072 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2073 self.TextIOWrapper(b, encoding="hex")
2074
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002075 def test_detach(self):
2076 r = self.BytesIO()
2077 b = self.BufferedWriter(r)
2078 t = self.TextIOWrapper(b)
2079 self.assertIs(t.detach(), b)
2080
2081 t = self.TextIOWrapper(b, encoding="ascii")
2082 t.write("howdy")
2083 self.assertFalse(r.getvalue())
2084 t.detach()
2085 self.assertEqual(r.getvalue(), b"howdy")
2086 self.assertRaises(ValueError, t.detach)
2087
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002088 def test_repr(self):
2089 raw = self.BytesIO("hello".encode("utf-8"))
2090 b = self.BufferedReader(raw)
2091 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002092 modname = self.TextIOWrapper.__module__
2093 self.assertEqual(repr(t),
2094 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2095 raw.name = "dummy"
2096 self.assertEqual(repr(t),
2097 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002098 t.mode = "r"
2099 self.assertEqual(repr(t),
2100 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002101 raw.name = b"dummy"
2102 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002103 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 def test_line_buffering(self):
2106 r = self.BytesIO()
2107 b = self.BufferedWriter(r, 1000)
2108 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002109 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002111 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002113 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002115
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002116 def test_default_encoding(self):
2117 old_environ = dict(os.environ)
2118 try:
2119 # try to get a user preferred encoding different than the current
2120 # locale encoding to check that TextIOWrapper() uses the current
2121 # locale encoding and not the user preferred encoding
2122 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2123 if key in os.environ:
2124 del os.environ[key]
2125
2126 current_locale_encoding = locale.getpreferredencoding(False)
2127 b = self.BytesIO()
2128 t = self.TextIOWrapper(b)
2129 self.assertEqual(t.encoding, current_locale_encoding)
2130 finally:
2131 os.environ.clear()
2132 os.environ.update(old_environ)
2133
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002134 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002135 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002136 # Issue 15989
2137 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002138 b = self.BytesIO()
2139 b.fileno = lambda: _testcapi.INT_MAX + 1
2140 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2141 b.fileno = lambda: _testcapi.UINT_MAX + 1
2142 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 def test_encoding(self):
2145 # Check the encoding attribute is always set, and valid
2146 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002147 t = self.TextIOWrapper(b, encoding="utf-8")
2148 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002150 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 codecs.lookup(t.encoding)
2152
2153 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002154 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 b = self.BytesIO(b"abc\n\xff\n")
2156 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002157 self.assertRaises(UnicodeError, t.read)
2158 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 b = self.BytesIO(b"abc\n\xff\n")
2160 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002161 self.assertRaises(UnicodeError, t.read)
2162 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 b = self.BytesIO(b"abc\n\xff\n")
2164 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002166 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 b = self.BytesIO(b"abc\n\xff\n")
2168 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002172 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 b = self.BytesIO()
2174 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002175 self.assertRaises(UnicodeError, t.write, "\xff")
2176 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 b = self.BytesIO()
2178 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002179 self.assertRaises(UnicodeError, t.write, "\xff")
2180 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 b = self.BytesIO()
2182 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002183 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002184 t.write("abc\xffdef\n")
2185 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002187 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 b = self.BytesIO()
2189 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002190 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002191 t.write("abc\xffdef\n")
2192 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002196 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2197
2198 tests = [
2199 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002200 [ '', input_lines ],
2201 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2202 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2203 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002204 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002205 encodings = (
2206 'utf-8', 'latin-1',
2207 'utf-16', 'utf-16-le', 'utf-16-be',
2208 'utf-32', 'utf-32-le', 'utf-32-be',
2209 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002210
Guido van Rossum8358db22007-08-18 21:39:55 +00002211 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002212 # character in TextIOWrapper._pending_line.
2213 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002214 # XXX: str.encode() should return bytes
2215 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002216 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002217 for bufsize in range(1, 10):
2218 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002219 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2220 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002221 encoding=encoding)
2222 if do_reads:
2223 got_lines = []
2224 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002225 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002226 if c2 == '':
2227 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002228 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002229 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002230 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002231 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002232
2233 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002234 self.assertEqual(got_line, exp_line)
2235 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237 def test_newlines_input(self):
2238 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002239 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2240 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002241 (None, normalized.decode("ascii").splitlines(keepends=True)),
2242 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2244 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2245 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002246 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002247 buf = self.BytesIO(testdata)
2248 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002250 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002251 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 def test_newlines_output(self):
2254 testdict = {
2255 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2256 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2257 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2258 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2259 }
2260 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2261 for newline, expected in tests:
2262 buf = self.BytesIO()
2263 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2264 txt.write("AAA\nB")
2265 txt.write("BB\nCCC\n")
2266 txt.write("X\rY\r\nZ")
2267 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(buf.closed, False)
2269 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270
2271 def test_destructor(self):
2272 l = []
2273 base = self.BytesIO
2274 class MyBytesIO(base):
2275 def close(self):
2276 l.append(self.getvalue())
2277 base.close(self)
2278 b = MyBytesIO()
2279 t = self.TextIOWrapper(b, encoding="ascii")
2280 t.write("abc")
2281 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002282 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002283 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284
2285 def test_override_destructor(self):
2286 record = []
2287 class MyTextIO(self.TextIOWrapper):
2288 def __del__(self):
2289 record.append(1)
2290 try:
2291 f = super().__del__
2292 except AttributeError:
2293 pass
2294 else:
2295 f()
2296 def close(self):
2297 record.append(2)
2298 super().close()
2299 def flush(self):
2300 record.append(3)
2301 super().flush()
2302 b = self.BytesIO()
2303 t = MyTextIO(b, encoding="ascii")
2304 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002305 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002306 self.assertEqual(record, [1, 2, 3])
2307
2308 def test_error_through_destructor(self):
2309 # Test that the exception state is not modified by a destructor,
2310 # even if close() fails.
2311 rawio = self.CloseFailureIO()
2312 def f():
2313 self.TextIOWrapper(rawio).xyzzy
2314 with support.captured_output("stderr") as s:
2315 self.assertRaises(AttributeError, f)
2316 s = s.getvalue().strip()
2317 if s:
2318 # The destructor *may* have printed an unraisable error, check it
2319 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002320 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002321 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002322
Guido van Rossum9b76da62007-04-11 01:09:03 +00002323 # Systematic tests of the text I/O API
2324
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002326 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002327 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002329 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002331 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002333 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(f.tell(), 0)
2335 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002336 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(f.seek(0), 0)
2338 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002339 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002340 self.assertEqual(f.read(2), "ab")
2341 self.assertEqual(f.read(1), "c")
2342 self.assertEqual(f.read(1), "")
2343 self.assertEqual(f.read(), "")
2344 self.assertEqual(f.tell(), cookie)
2345 self.assertEqual(f.seek(0), 0)
2346 self.assertEqual(f.seek(0, 2), cookie)
2347 self.assertEqual(f.write("def"), 3)
2348 self.assertEqual(f.seek(cookie), cookie)
2349 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002350 if enc.startswith("utf"):
2351 self.multi_line_test(f, enc)
2352 f.close()
2353
2354 def multi_line_test(self, f, enc):
2355 f.seek(0)
2356 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002357 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002358 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002359 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002360 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002361 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002362 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002363 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002364 wlines.append((f.tell(), line))
2365 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002366 f.seek(0)
2367 rlines = []
2368 while True:
2369 pos = f.tell()
2370 line = f.readline()
2371 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002372 break
2373 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002377 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002378 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002379 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002380 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002381 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002382 p2 = f.tell()
2383 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(f.tell(), p0)
2385 self.assertEqual(f.readline(), "\xff\n")
2386 self.assertEqual(f.tell(), p1)
2387 self.assertEqual(f.readline(), "\xff\n")
2388 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002389 f.seek(0)
2390 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002391 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002392 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002394 f.close()
2395
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 def test_seeking(self):
2397 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002398 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002399 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002400 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002401 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002402 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002403 suffix = bytes(u_suffix.encode("utf-8"))
2404 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002405 with self.open(support.TESTFN, "wb") as f:
2406 f.write(line*2)
2407 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2408 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(s, str(prefix, "ascii"))
2410 self.assertEqual(f.tell(), prefix_size)
2411 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002414 # Regression test for a specific bug
2415 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002416 with self.open(support.TESTFN, "wb") as f:
2417 f.write(data)
2418 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2419 f._CHUNK_SIZE # Just test that it exists
2420 f._CHUNK_SIZE = 2
2421 f.readline()
2422 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002423
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 def test_seek_and_tell(self):
2425 #Test seek/tell using the StatefulIncrementalDecoder.
2426 # Make test faster by doing smaller seeks
2427 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002428
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002429 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002430 """Tell/seek to various points within a data stream and ensure
2431 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002433 f.write(data)
2434 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002435 f = self.open(support.TESTFN, encoding='test_decoder')
2436 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002437 decoded = f.read()
2438 f.close()
2439
Neal Norwitze2b07052008-03-18 19:52:05 +00002440 for i in range(min_pos, len(decoded) + 1): # seek positions
2441 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002444 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002446 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002448 f.close()
2449
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002450 # Enable the test decoder.
2451 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002452
2453 # Run the tests.
2454 try:
2455 # Try each test case.
2456 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002457 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002458
2459 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002460 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2461 offset = CHUNK_SIZE - len(input)//2
2462 prefix = b'.'*offset
2463 # Don't bother seeking into the prefix (takes too long).
2464 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002465 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002466
2467 # Ensure our test decoder won't interfere with subsequent tests.
2468 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002469 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002471 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002472 data = "1234567890"
2473 tests = ("utf-16",
2474 "utf-16-le",
2475 "utf-16-be",
2476 "utf-32",
2477 "utf-32-le",
2478 "utf-32-be")
2479 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002480 buf = self.BytesIO()
2481 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002482 # Check if the BOM is written only once (see issue1753).
2483 f.write(data)
2484 f.write(data)
2485 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002486 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002487 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002488 self.assertEqual(f.read(), data * 2)
2489 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002490
Benjamin Petersona1b49012009-03-31 23:11:32 +00002491 def test_unreadable(self):
2492 class UnReadable(self.BytesIO):
2493 def readable(self):
2494 return False
2495 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002496 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002498 def test_read_one_by_one(self):
2499 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002500 reads = ""
2501 while True:
2502 c = txt.read(1)
2503 if not c:
2504 break
2505 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002506 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002507
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002508 def test_readlines(self):
2509 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2510 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2511 txt.seek(0)
2512 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2513 txt.seek(0)
2514 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2515
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002516 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002517 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002518 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002520 reads = ""
2521 while True:
2522 c = txt.read(128)
2523 if not c:
2524 break
2525 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002527
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002528 def test_writelines(self):
2529 l = ['ab', 'cd', 'ef']
2530 buf = self.BytesIO()
2531 txt = self.TextIOWrapper(buf)
2532 txt.writelines(l)
2533 txt.flush()
2534 self.assertEqual(buf.getvalue(), b'abcdef')
2535
2536 def test_writelines_userlist(self):
2537 l = UserList(['ab', 'cd', 'ef'])
2538 buf = self.BytesIO()
2539 txt = self.TextIOWrapper(buf)
2540 txt.writelines(l)
2541 txt.flush()
2542 self.assertEqual(buf.getvalue(), b'abcdef')
2543
2544 def test_writelines_error(self):
2545 txt = self.TextIOWrapper(self.BytesIO())
2546 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2547 self.assertRaises(TypeError, txt.writelines, None)
2548 self.assertRaises(TypeError, txt.writelines, b'abc')
2549
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002550 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002551 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002552
2553 # read one char at a time
2554 reads = ""
2555 while True:
2556 c = txt.read(1)
2557 if not c:
2558 break
2559 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002560 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002561
2562 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002564 txt._CHUNK_SIZE = 4
2565
2566 reads = ""
2567 while True:
2568 c = txt.read(4)
2569 if not c:
2570 break
2571 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002573
2574 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002575 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002576 txt._CHUNK_SIZE = 4
2577
2578 reads = txt.read(4)
2579 reads += txt.read(4)
2580 reads += txt.readline()
2581 reads += txt.readline()
2582 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002583 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002584
2585 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002586 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002587 txt._CHUNK_SIZE = 4
2588
2589 reads = txt.read(4)
2590 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002591 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002592
2593 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002595 txt._CHUNK_SIZE = 4
2596
2597 reads = txt.read(4)
2598 pos = txt.tell()
2599 txt.seek(0)
2600 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002601 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002602
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002603 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002604 buffer = self.BytesIO(self.testdata)
2605 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002606
2607 self.assertEqual(buffer.seekable(), txt.seekable())
2608
Antoine Pitroue4501852009-05-14 18:55:55 +00002609 def test_append_bom(self):
2610 # The BOM is not written again when appending to a non-empty file
2611 filename = support.TESTFN
2612 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2613 with self.open(filename, 'w', encoding=charset) as f:
2614 f.write('aaa')
2615 pos = f.tell()
2616 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002617 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002618
2619 with self.open(filename, 'a', encoding=charset) as f:
2620 f.write('xxx')
2621 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002623
2624 def test_seek_bom(self):
2625 # Same test, but when seeking manually
2626 filename = support.TESTFN
2627 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2628 with self.open(filename, 'w', encoding=charset) as f:
2629 f.write('aaa')
2630 pos = f.tell()
2631 with self.open(filename, 'r+', encoding=charset) as f:
2632 f.seek(pos)
2633 f.write('zzz')
2634 f.seek(0)
2635 f.write('bbb')
2636 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002637 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002638
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002639 def test_errors_property(self):
2640 with self.open(support.TESTFN, "w") as f:
2641 self.assertEqual(f.errors, "strict")
2642 with self.open(support.TESTFN, "w", errors="replace") as f:
2643 self.assertEqual(f.errors, "replace")
2644
Brett Cannon31f59292011-02-21 19:29:56 +00002645 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002646 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002647 def test_threads_write(self):
2648 # Issue6750: concurrent writes could duplicate data
2649 event = threading.Event()
2650 with self.open(support.TESTFN, "w", buffering=1) as f:
2651 def run(n):
2652 text = "Thread%03d\n" % n
2653 event.wait()
2654 f.write(text)
2655 threads = [threading.Thread(target=lambda n=x: run(n))
2656 for x in range(20)]
2657 for t in threads:
2658 t.start()
2659 time.sleep(0.02)
2660 event.set()
2661 for t in threads:
2662 t.join()
2663 with self.open(support.TESTFN) as f:
2664 content = f.read()
2665 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002667
Antoine Pitrou6be88762010-05-03 16:48:20 +00002668 def test_flush_error_on_close(self):
2669 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2670 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002671 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002672 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002673 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002674 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002675
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002676 def test_close_error_on_close(self):
2677 buffer = self.BytesIO(self.testdata)
2678 def bad_flush():
2679 raise OSError('flush')
2680 def bad_close():
2681 raise OSError('close')
2682 buffer.close = bad_close
2683 txt = self.TextIOWrapper(buffer, encoding="ascii")
2684 txt.flush = bad_flush
2685 with self.assertRaises(OSError) as err: # exception not swallowed
2686 txt.close()
2687 self.assertEqual(err.exception.args, ('close',))
2688 self.assertIsInstance(err.exception.__context__, OSError)
2689 self.assertEqual(err.exception.__context__.args, ('flush',))
2690 self.assertFalse(txt.closed)
2691
2692 def test_nonnormalized_close_error_on_close(self):
2693 # Issue #21677
2694 buffer = self.BytesIO(self.testdata)
2695 def bad_flush():
2696 raise non_existing_flush
2697 def bad_close():
2698 raise non_existing_close
2699 buffer.close = bad_close
2700 txt = self.TextIOWrapper(buffer, encoding="ascii")
2701 txt.flush = bad_flush
2702 with self.assertRaises(NameError) as err: # exception not swallowed
2703 txt.close()
2704 self.assertIn('non_existing_close', str(err.exception))
2705 self.assertIsInstance(err.exception.__context__, NameError)
2706 self.assertIn('non_existing_flush', str(err.exception.__context__))
2707 self.assertFalse(txt.closed)
2708
Antoine Pitrou6be88762010-05-03 16:48:20 +00002709 def test_multi_close(self):
2710 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2711 txt.close()
2712 txt.close()
2713 txt.close()
2714 self.assertRaises(ValueError, txt.flush)
2715
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002716 def test_unseekable(self):
2717 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2718 self.assertRaises(self.UnsupportedOperation, txt.tell)
2719 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2720
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002721 def test_readonly_attributes(self):
2722 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2723 buf = self.BytesIO(self.testdata)
2724 with self.assertRaises(AttributeError):
2725 txt.buffer = buf
2726
Antoine Pitroue96ec682011-07-23 21:46:35 +02002727 def test_rawio(self):
2728 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2729 # that subprocess.Popen() can have the required unbuffered
2730 # semantics with universal_newlines=True.
2731 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2732 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2733 # Reads
2734 self.assertEqual(txt.read(4), 'abcd')
2735 self.assertEqual(txt.readline(), 'efghi\n')
2736 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2737
2738 def test_rawio_write_through(self):
2739 # Issue #12591: with write_through=True, writes don't need a flush
2740 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2741 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2742 write_through=True)
2743 txt.write('1')
2744 txt.write('23\n4')
2745 txt.write('5')
2746 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2747
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002748 def test_bufio_write_through(self):
2749 # Issue #21396: write_through=True doesn't force a flush()
2750 # on the underlying binary buffered object.
2751 flush_called, write_called = [], []
2752 class BufferedWriter(self.BufferedWriter):
2753 def flush(self, *args, **kwargs):
2754 flush_called.append(True)
2755 return super().flush(*args, **kwargs)
2756 def write(self, *args, **kwargs):
2757 write_called.append(True)
2758 return super().write(*args, **kwargs)
2759
2760 rawio = self.BytesIO()
2761 data = b"a"
2762 bufio = BufferedWriter(rawio, len(data)*2)
2763 textio = self.TextIOWrapper(bufio, encoding='ascii',
2764 write_through=True)
2765 # write to the buffered io but don't overflow the buffer
2766 text = data.decode('ascii')
2767 textio.write(text)
2768
2769 # buffer.flush is not called with write_through=True
2770 self.assertFalse(flush_called)
2771 # buffer.write *is* called with write_through=True
2772 self.assertTrue(write_called)
2773 self.assertEqual(rawio.getvalue(), b"") # no flush
2774
2775 write_called = [] # reset
2776 textio.write(text * 10) # total content is larger than bufio buffer
2777 self.assertTrue(write_called)
2778 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2779
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002780 def test_read_nonbytes(self):
2781 # Issue #17106
2782 # Crash when underlying read() returns non-bytes
2783 t = self.TextIOWrapper(self.StringIO('a'))
2784 self.assertRaises(TypeError, t.read, 1)
2785 t = self.TextIOWrapper(self.StringIO('a'))
2786 self.assertRaises(TypeError, t.readline)
2787 t = self.TextIOWrapper(self.StringIO('a'))
2788 self.assertRaises(TypeError, t.read)
2789
2790 def test_illegal_decoder(self):
2791 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002792 # Bypass the early encoding check added in issue 20404
2793 def _make_illegal_wrapper():
2794 quopri = codecs.lookup("quopri")
2795 quopri._is_text_encoding = True
2796 try:
2797 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2798 newline='\n', encoding="quopri")
2799 finally:
2800 quopri._is_text_encoding = False
2801 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002802 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002803 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002804 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002805 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002806 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002807 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002808 self.assertRaises(TypeError, t.read)
2809
Antoine Pitrou712cb732013-12-21 15:51:54 +01002810 def _check_create_at_shutdown(self, **kwargs):
2811 # Issue #20037: creating a TextIOWrapper at shutdown
2812 # shouldn't crash the interpreter.
2813 iomod = self.io.__name__
2814 code = """if 1:
2815 import codecs
2816 import {iomod} as io
2817
2818 # Avoid looking up codecs at shutdown
2819 codecs.lookup('utf-8')
2820
2821 class C:
2822 def __init__(self):
2823 self.buf = io.BytesIO()
2824 def __del__(self):
2825 io.TextIOWrapper(self.buf, **{kwargs})
2826 print("ok")
2827 c = C()
2828 """.format(iomod=iomod, kwargs=kwargs)
2829 return assert_python_ok("-c", code)
2830
2831 def test_create_at_shutdown_without_encoding(self):
2832 rc, out, err = self._check_create_at_shutdown()
2833 if err:
2834 # Can error out with a RuntimeError if the module state
2835 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002836 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002837 else:
2838 self.assertEqual("ok", out.decode().strip())
2839
2840 def test_create_at_shutdown_with_encoding(self):
2841 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2842 errors='strict')
2843 self.assertFalse(err)
2844 self.assertEqual("ok", out.decode().strip())
2845
Antoine Pitroub8503892014-04-29 10:14:02 +02002846 def test_read_byteslike(self):
2847 r = MemviewBytesIO(b'Just some random string\n')
2848 t = self.TextIOWrapper(r, 'utf-8')
2849
2850 # TextIOwrapper will not read the full string, because
2851 # we truncate it to a multiple of the native int size
2852 # so that we can construct a more complex memoryview.
2853 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2854
2855 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2856
2857class MemviewBytesIO(io.BytesIO):
2858 '''A BytesIO object whose read method returns memoryviews
2859 rather than bytes'''
2860
2861 def read1(self, len_):
2862 return _to_memoryview(super().read1(len_))
2863
2864 def read(self, len_):
2865 return _to_memoryview(super().read(len_))
2866
2867def _to_memoryview(buf):
2868 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2869
2870 arr = array.array('i')
2871 idx = len(buf) - len(buf) % arr.itemsize
2872 arr.frombytes(buf[:idx])
2873 return memoryview(arr)
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002875class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002876 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002877 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002878
2879 def test_initialization(self):
2880 r = self.BytesIO(b"\xc3\xa9\n\n")
2881 b = self.BufferedReader(r, 1000)
2882 t = self.TextIOWrapper(b)
2883 self.assertRaises(TypeError, t.__init__, b, newline=42)
2884 self.assertRaises(ValueError, t.read)
2885 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2886 self.assertRaises(ValueError, t.read)
2887
2888 def test_garbage_collection(self):
2889 # C TextIOWrapper objects are collected, and collecting them flushes
2890 # all data to disk.
2891 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002892 with support.check_warnings(('', ResourceWarning)):
2893 rawio = io.FileIO(support.TESTFN, "wb")
2894 b = self.BufferedWriter(rawio)
2895 t = self.TextIOWrapper(b, encoding="ascii")
2896 t.write("456def")
2897 t.x = t
2898 wr = weakref.ref(t)
2899 del t
2900 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002901 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002902 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002903 self.assertEqual(f.read(), b"456def")
2904
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002905 def test_rwpair_cleared_before_textio(self):
2906 # Issue 13070: TextIOWrapper's finalization would crash when called
2907 # after the reference to the underlying BufferedRWPair's writer got
2908 # cleared by the GC.
2909 for i in range(1000):
2910 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2911 t1 = self.TextIOWrapper(b1, encoding="ascii")
2912 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2913 t2 = self.TextIOWrapper(b2, encoding="ascii")
2914 # circular references
2915 t1.buddy = t2
2916 t2.buddy = t1
2917 support.gc_collect()
2918
2919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002920class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002921 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002922 #shutdown_error = "LookupError: unknown encoding: ascii"
2923 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002924
2925
2926class IncrementalNewlineDecoderTest(unittest.TestCase):
2927
2928 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002929 # UTF-8 specific tests for a newline decoder
2930 def _check_decode(b, s, **kwargs):
2931 # We exercise getstate() / setstate() as well as decode()
2932 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002934 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002935 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002936
Antoine Pitrou180a3362008-12-14 16:36:46 +00002937 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002938
Antoine Pitrou180a3362008-12-14 16:36:46 +00002939 _check_decode(b'\xe8', "")
2940 _check_decode(b'\xa2', "")
2941 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002942
Antoine Pitrou180a3362008-12-14 16:36:46 +00002943 _check_decode(b'\xe8', "")
2944 _check_decode(b'\xa2', "")
2945 _check_decode(b'\x88', "\u8888")
2946
2947 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002948 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2949
Antoine Pitrou180a3362008-12-14 16:36:46 +00002950 decoder.reset()
2951 _check_decode(b'\n', "\n")
2952 _check_decode(b'\r', "")
2953 _check_decode(b'', "\n", final=True)
2954 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002955
Antoine Pitrou180a3362008-12-14 16:36:46 +00002956 _check_decode(b'\r', "")
2957 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002958
Antoine Pitrou180a3362008-12-14 16:36:46 +00002959 _check_decode(b'\r\r\n', "\n\n")
2960 _check_decode(b'\r', "")
2961 _check_decode(b'\r', "\n")
2962 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002963
Antoine Pitrou180a3362008-12-14 16:36:46 +00002964 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2965 _check_decode(b'\xe8\xa2\x88', "\u8888")
2966 _check_decode(b'\n', "\n")
2967 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2968 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002970 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002971 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 if encoding is not None:
2973 encoder = codecs.getincrementalencoder(encoding)()
2974 def _decode_bytewise(s):
2975 # Decode one byte at a time
2976 for b in encoder.encode(s):
2977 result.append(decoder.decode(bytes([b])))
2978 else:
2979 encoder = None
2980 def _decode_bytewise(s):
2981 # Decode one char at a time
2982 for c in s:
2983 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002984 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002985 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002987 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002988 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002989 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002990 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002991 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002992 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002993 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002994 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002995 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002996 input = "abc"
2997 if encoder is not None:
2998 encoder.reset()
2999 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003000 self.assertEqual(decoder.decode(input), "abc")
3001 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003002
3003 def test_newline_decoder(self):
3004 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003005 # None meaning the IncrementalNewlineDecoder takes unicode input
3006 # rather than bytes input
3007 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003008 'utf-16', 'utf-16-le', 'utf-16-be',
3009 'utf-32', 'utf-32-le', 'utf-32-be',
3010 )
3011 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003012 decoder = enc and codecs.getincrementaldecoder(enc)()
3013 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3014 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003015 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003016 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3017 self.check_newline_decoding_utf8(decoder)
3018
Antoine Pitrou66913e22009-03-06 23:40:56 +00003019 def test_newline_bytes(self):
3020 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3021 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003022 self.assertEqual(dec.newlines, None)
3023 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3024 self.assertEqual(dec.newlines, None)
3025 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3026 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003027 dec = self.IncrementalNewlineDecoder(None, translate=False)
3028 _check(dec)
3029 dec = self.IncrementalNewlineDecoder(None, translate=True)
3030 _check(dec)
3031
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3033 pass
3034
3035class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3036 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003037
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003038
Guido van Rossum01a27522007-03-07 01:00:12 +00003039# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003040
Guido van Rossum5abbf752007-08-27 17:39:33 +00003041class MiscIOTest(unittest.TestCase):
3042
Barry Warsaw40e82462008-11-20 20:14:50 +00003043 def tearDown(self):
3044 support.unlink(support.TESTFN)
3045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003046 def test___all__(self):
3047 for name in self.io.__all__:
3048 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003049 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003050 if name == "open":
3051 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003052 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003053 self.assertTrue(issubclass(obj, Exception), name)
3054 elif not name.startswith("SEEK_"):
3055 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003056
Barry Warsaw40e82462008-11-20 20:14:50 +00003057 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003058 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003059 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003060 f.close()
3061
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003062 with support.check_warnings(('', DeprecationWarning)):
3063 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003064 self.assertEqual(f.name, support.TESTFN)
3065 self.assertEqual(f.buffer.name, support.TESTFN)
3066 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3067 self.assertEqual(f.mode, "U")
3068 self.assertEqual(f.buffer.mode, "rb")
3069 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003070 f.close()
3071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003072 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003073 self.assertEqual(f.mode, "w+")
3074 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3075 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003077 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003078 self.assertEqual(g.mode, "wb")
3079 self.assertEqual(g.raw.mode, "wb")
3080 self.assertEqual(g.name, f.fileno())
3081 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003082 f.close()
3083 g.close()
3084
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003085 def test_io_after_close(self):
3086 for kwargs in [
3087 {"mode": "w"},
3088 {"mode": "wb"},
3089 {"mode": "w", "buffering": 1},
3090 {"mode": "w", "buffering": 2},
3091 {"mode": "wb", "buffering": 0},
3092 {"mode": "r"},
3093 {"mode": "rb"},
3094 {"mode": "r", "buffering": 1},
3095 {"mode": "r", "buffering": 2},
3096 {"mode": "rb", "buffering": 0},
3097 {"mode": "w+"},
3098 {"mode": "w+b"},
3099 {"mode": "w+", "buffering": 1},
3100 {"mode": "w+", "buffering": 2},
3101 {"mode": "w+b", "buffering": 0},
3102 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003104 f.close()
3105 self.assertRaises(ValueError, f.flush)
3106 self.assertRaises(ValueError, f.fileno)
3107 self.assertRaises(ValueError, f.isatty)
3108 self.assertRaises(ValueError, f.__iter__)
3109 if hasattr(f, "peek"):
3110 self.assertRaises(ValueError, f.peek, 1)
3111 self.assertRaises(ValueError, f.read)
3112 if hasattr(f, "read1"):
3113 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003114 if hasattr(f, "readall"):
3115 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003116 if hasattr(f, "readinto"):
3117 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003118 if hasattr(f, "readinto1"):
3119 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003120 self.assertRaises(ValueError, f.readline)
3121 self.assertRaises(ValueError, f.readlines)
3122 self.assertRaises(ValueError, f.seek, 0)
3123 self.assertRaises(ValueError, f.tell)
3124 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003125 self.assertRaises(ValueError, f.write,
3126 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003127 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003128 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003130 def test_blockingioerror(self):
3131 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003132 class C(str):
3133 pass
3134 c = C("")
3135 b = self.BlockingIOError(1, c)
3136 c.b = b
3137 b.c = c
3138 wr = weakref.ref(c)
3139 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003140 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003141 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003142
3143 def test_abcs(self):
3144 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003145 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3146 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3147 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3148 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003149
3150 def _check_abc_inheritance(self, abcmodule):
3151 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003152 self.assertIsInstance(f, abcmodule.IOBase)
3153 self.assertIsInstance(f, abcmodule.RawIOBase)
3154 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3155 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003156 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003157 self.assertIsInstance(f, abcmodule.IOBase)
3158 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3159 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3160 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003162 self.assertIsInstance(f, abcmodule.IOBase)
3163 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3164 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3165 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003166
3167 def test_abc_inheritance(self):
3168 # Test implementations inherit from their respective ABCs
3169 self._check_abc_inheritance(self)
3170
3171 def test_abc_inheritance_official(self):
3172 # Test implementations inherit from the official ABCs of the
3173 # baseline "io" module.
3174 self._check_abc_inheritance(io)
3175
Antoine Pitroue033e062010-10-29 10:38:18 +00003176 def _check_warn_on_dealloc(self, *args, **kwargs):
3177 f = open(*args, **kwargs)
3178 r = repr(f)
3179 with self.assertWarns(ResourceWarning) as cm:
3180 f = None
3181 support.gc_collect()
3182 self.assertIn(r, str(cm.warning.args[0]))
3183
3184 def test_warn_on_dealloc(self):
3185 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3186 self._check_warn_on_dealloc(support.TESTFN, "wb")
3187 self._check_warn_on_dealloc(support.TESTFN, "w")
3188
3189 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3190 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003191 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003192 for fd in fds:
3193 try:
3194 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003195 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003196 if e.errno != errno.EBADF:
3197 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003198 self.addCleanup(cleanup_fds)
3199 r, w = os.pipe()
3200 fds += r, w
3201 self._check_warn_on_dealloc(r, *args, **kwargs)
3202 # When using closefd=False, there's no warning
3203 r, w = os.pipe()
3204 fds += r, w
3205 with warnings.catch_warnings(record=True) as recorded:
3206 open(r, *args, closefd=False, **kwargs)
3207 support.gc_collect()
3208 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003209
3210 def test_warn_on_dealloc_fd(self):
3211 self._check_warn_on_dealloc_fd("rb", buffering=0)
3212 self._check_warn_on_dealloc_fd("rb")
3213 self._check_warn_on_dealloc_fd("r")
3214
3215
Antoine Pitrou243757e2010-11-05 21:15:39 +00003216 def test_pickling(self):
3217 # Pickling file objects is forbidden
3218 for kwargs in [
3219 {"mode": "w"},
3220 {"mode": "wb"},
3221 {"mode": "wb", "buffering": 0},
3222 {"mode": "r"},
3223 {"mode": "rb"},
3224 {"mode": "rb", "buffering": 0},
3225 {"mode": "w+"},
3226 {"mode": "w+b"},
3227 {"mode": "w+b", "buffering": 0},
3228 ]:
3229 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3230 with self.open(support.TESTFN, **kwargs) as f:
3231 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3232
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003233 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3234 def test_nonblock_pipe_write_bigbuf(self):
3235 self._test_nonblock_pipe_write(16*1024)
3236
3237 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3238 def test_nonblock_pipe_write_smallbuf(self):
3239 self._test_nonblock_pipe_write(1024)
3240
3241 def _set_non_blocking(self, fd):
3242 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3243 self.assertNotEqual(flags, -1)
3244 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3245 self.assertEqual(res, 0)
3246
3247 def _test_nonblock_pipe_write(self, bufsize):
3248 sent = []
3249 received = []
3250 r, w = os.pipe()
3251 self._set_non_blocking(r)
3252 self._set_non_blocking(w)
3253
3254 # To exercise all code paths in the C implementation we need
3255 # to play with buffer sizes. For instance, if we choose a
3256 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3257 # then we will never get a partial write of the buffer.
3258 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3259 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3260
3261 with rf, wf:
3262 for N in 9999, 73, 7574:
3263 try:
3264 i = 0
3265 while True:
3266 msg = bytes([i % 26 + 97]) * N
3267 sent.append(msg)
3268 wf.write(msg)
3269 i += 1
3270
3271 except self.BlockingIOError as e:
3272 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003273 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003274 sent[-1] = sent[-1][:e.characters_written]
3275 received.append(rf.read())
3276 msg = b'BLOCKED'
3277 wf.write(msg)
3278 sent.append(msg)
3279
3280 while True:
3281 try:
3282 wf.flush()
3283 break
3284 except self.BlockingIOError as e:
3285 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003286 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003287 self.assertEqual(e.characters_written, 0)
3288 received.append(rf.read())
3289
3290 received += iter(rf.read, None)
3291
3292 sent, received = b''.join(sent), b''.join(received)
3293 self.assertTrue(sent == received)
3294 self.assertTrue(wf.closed)
3295 self.assertTrue(rf.closed)
3296
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003297 def test_create_fail(self):
3298 # 'x' mode fails if file is existing
3299 with self.open(support.TESTFN, 'w'):
3300 pass
3301 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3302
3303 def test_create_writes(self):
3304 # 'x' mode opens for writing
3305 with self.open(support.TESTFN, 'xb') as f:
3306 f.write(b"spam")
3307 with self.open(support.TESTFN, 'rb') as f:
3308 self.assertEqual(b"spam", f.read())
3309
Christian Heimes7b648752012-09-10 14:48:43 +02003310 def test_open_allargs(self):
3311 # there used to be a buffer overflow in the parser for rawmode
3312 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3313
3314
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003315class CMiscIOTest(MiscIOTest):
3316 io = io
3317
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003318 def test_readinto_buffer_overflow(self):
3319 # Issue #18025
3320 class BadReader(self.io.BufferedIOBase):
3321 def read(self, n=-1):
3322 return b'x' * 10**6
3323 bufio = BadReader()
3324 b = bytearray(2)
3325 self.assertRaises(ValueError, bufio.readinto, b)
3326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003327class PyMiscIOTest(MiscIOTest):
3328 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003329
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003330
3331@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3332class SignalsTest(unittest.TestCase):
3333
3334 def setUp(self):
3335 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3336
3337 def tearDown(self):
3338 signal.signal(signal.SIGALRM, self.oldalrm)
3339
3340 def alarm_interrupt(self, sig, frame):
3341 1/0
3342
3343 @unittest.skipUnless(threading, 'Threading required for this test.')
3344 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3345 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003346 invokes the signal handler, and bubbles up the exception raised
3347 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003348 read_results = []
3349 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003350 if hasattr(signal, 'pthread_sigmask'):
3351 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003352 s = os.read(r, 1)
3353 read_results.append(s)
3354 t = threading.Thread(target=_read)
3355 t.daemon = True
3356 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003357 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003358 try:
3359 wio = self.io.open(w, **fdopen_kwargs)
3360 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003361 # Fill the pipe enough that the write will be blocking.
3362 # It will be interrupted by the timer armed above. Since the
3363 # other thread has read one byte, the low-level write will
3364 # return with a successful (partial) result rather than an EINTR.
3365 # The buffered IO layer must check for pending signal
3366 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003367 signal.alarm(1)
3368 try:
3369 self.assertRaises(ZeroDivisionError,
3370 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3371 finally:
3372 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003373 t.join()
3374 # We got one byte, get another one and check that it isn't a
3375 # repeat of the first one.
3376 read_results.append(os.read(r, 1))
3377 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3378 finally:
3379 os.close(w)
3380 os.close(r)
3381 # This is deliberate. If we didn't close the file descriptor
3382 # before closing wio, wio would try to flush its internal
3383 # buffer, and block again.
3384 try:
3385 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003386 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003387 if e.errno != errno.EBADF:
3388 raise
3389
3390 def test_interrupted_write_unbuffered(self):
3391 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3392
3393 def test_interrupted_write_buffered(self):
3394 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3395
3396 def test_interrupted_write_text(self):
3397 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3398
Brett Cannon31f59292011-02-21 19:29:56 +00003399 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003400 def check_reentrant_write(self, data, **fdopen_kwargs):
3401 def on_alarm(*args):
3402 # Will be called reentrantly from the same thread
3403 wio.write(data)
3404 1/0
3405 signal.signal(signal.SIGALRM, on_alarm)
3406 r, w = os.pipe()
3407 wio = self.io.open(w, **fdopen_kwargs)
3408 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003409 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003410 # Either the reentrant call to wio.write() fails with RuntimeError,
3411 # or the signal handler raises ZeroDivisionError.
3412 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3413 while 1:
3414 for i in range(100):
3415 wio.write(data)
3416 wio.flush()
3417 # Make sure the buffer doesn't fill up and block further writes
3418 os.read(r, len(data) * 100)
3419 exc = cm.exception
3420 if isinstance(exc, RuntimeError):
3421 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3422 finally:
3423 wio.close()
3424 os.close(r)
3425
3426 def test_reentrant_write_buffered(self):
3427 self.check_reentrant_write(b"xy", mode="wb")
3428
3429 def test_reentrant_write_text(self):
3430 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3431
Antoine Pitrou707ce822011-02-25 21:24:11 +00003432 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3433 """Check that a buffered read, when it gets interrupted (either
3434 returning a partial result or EINTR), properly invokes the signal
3435 handler and retries if the latter returned successfully."""
3436 r, w = os.pipe()
3437 fdopen_kwargs["closefd"] = False
3438 def alarm_handler(sig, frame):
3439 os.write(w, b"bar")
3440 signal.signal(signal.SIGALRM, alarm_handler)
3441 try:
3442 rio = self.io.open(r, **fdopen_kwargs)
3443 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003444 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003445 # Expected behaviour:
3446 # - first raw read() returns partial b"foo"
3447 # - second raw read() returns EINTR
3448 # - third raw read() returns b"bar"
3449 self.assertEqual(decode(rio.read(6)), "foobar")
3450 finally:
3451 rio.close()
3452 os.close(w)
3453 os.close(r)
3454
Antoine Pitrou20db5112011-08-19 20:32:34 +02003455 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003456 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3457 mode="rb")
3458
Antoine Pitrou20db5112011-08-19 20:32:34 +02003459 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003460 self.check_interrupted_read_retry(lambda x: x,
3461 mode="r")
3462
3463 @unittest.skipUnless(threading, 'Threading required for this test.')
3464 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3465 """Check that a buffered write, when it gets interrupted (either
3466 returning a partial result or EINTR), properly invokes the signal
3467 handler and retries if the latter returned successfully."""
3468 select = support.import_module("select")
3469 # A quantity that exceeds the buffer size of an anonymous pipe's
3470 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003471 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003472 r, w = os.pipe()
3473 fdopen_kwargs["closefd"] = False
3474 # We need a separate thread to read from the pipe and allow the
3475 # write() to finish. This thread is started after the SIGALRM is
3476 # received (forcing a first EINTR in write()).
3477 read_results = []
3478 write_finished = False
3479 def _read():
3480 while not write_finished:
3481 while r in select.select([r], [], [], 1.0)[0]:
3482 s = os.read(r, 1024)
3483 read_results.append(s)
3484 t = threading.Thread(target=_read)
3485 t.daemon = True
3486 def alarm1(sig, frame):
3487 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003488 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003489 def alarm2(sig, frame):
3490 t.start()
3491 signal.signal(signal.SIGALRM, alarm1)
3492 try:
3493 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003494 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003495 # Expected behaviour:
3496 # - first raw write() is partial (because of the limited pipe buffer
3497 # and the first alarm)
3498 # - second raw write() returns EINTR (because of the second alarm)
3499 # - subsequent write()s are successful (either partial or complete)
3500 self.assertEqual(N, wio.write(item * N))
3501 wio.flush()
3502 write_finished = True
3503 t.join()
3504 self.assertEqual(N, sum(len(x) for x in read_results))
3505 finally:
3506 write_finished = True
3507 os.close(w)
3508 os.close(r)
3509 # This is deliberate. If we didn't close the file descriptor
3510 # before closing wio, wio would try to flush its internal
3511 # buffer, and could block (in case of failure).
3512 try:
3513 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003514 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003515 if e.errno != errno.EBADF:
3516 raise
3517
Antoine Pitrou20db5112011-08-19 20:32:34 +02003518 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003519 self.check_interrupted_write_retry(b"x", mode="wb")
3520
Antoine Pitrou20db5112011-08-19 20:32:34 +02003521 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003522 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3523
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003524
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003525class CSignalsTest(SignalsTest):
3526 io = io
3527
3528class PySignalsTest(SignalsTest):
3529 io = pyio
3530
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003531 # Handling reentrancy issues would slow down _pyio even more, so the
3532 # tests are disabled.
3533 test_reentrant_write_buffered = None
3534 test_reentrant_write_text = None
3535
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003536
Ezio Melottidaa42c72013-03-23 16:30:16 +02003537def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003538 tests = (CIOTest, PyIOTest,
3539 CBufferedReaderTest, PyBufferedReaderTest,
3540 CBufferedWriterTest, PyBufferedWriterTest,
3541 CBufferedRWPairTest, PyBufferedRWPairTest,
3542 CBufferedRandomTest, PyBufferedRandomTest,
3543 StatefulIncrementalDecoderTest,
3544 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3545 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003546 CMiscIOTest, PyMiscIOTest,
3547 CSignalsTest, PySignalsTest,
3548 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003549
3550 # Put the namespaces of the IO module we are testing and some useful mock
3551 # classes in the __dict__ of each test.
3552 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003553 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003554 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3555 c_io_ns = {name : getattr(io, name) for name in all_members}
3556 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3557 globs = globals()
3558 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3559 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3560 # Avoid turning open into a bound method.
3561 py_io_ns["open"] = pyio.OpenWrapper
3562 for test in tests:
3563 if test.__name__.startswith("C"):
3564 for name, obj in c_io_ns.items():
3565 setattr(test, name, obj)
3566 elif test.__name__.startswith("Py"):
3567 for name, obj in py_io_ns.items():
3568 setattr(test, name, obj)
3569
Ezio Melottidaa42c72013-03-23 16:30:16 +02003570 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3571 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003572
3573if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003574 unittest.main()