blob: e5474f0b8f1166cf025678f8ac854427983a9c0f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka78980432013-01-15 01:12:17 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200604 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000605 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200606 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200765 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000782 raw.flush = bad_flush
783 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200784 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
Jesus Ceadc469452012-10-04 12:37:56 +0200836 @support.cpython_only
837 def test_buffer_freeing(self) :
838 bufsize = 4096
839 rawio = self.MockRawIO()
840 bufio = self.tp(rawio, buffer_size=bufsize)
841 size = sys.getsizeof(bufio) - bufsize
842 bufio.close()
843 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
846 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 def test_constructor(self):
849 rawio = self.MockRawIO([b"abc"])
850 bufio = self.tp(rawio)
851 bufio.__init__(rawio)
852 bufio.__init__(rawio, buffer_size=1024)
853 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000854 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
858 rawio = self.MockRawIO([b"abc"])
859 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000860 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000863 for arg in (None, 7):
864 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
865 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000866 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 # Invalid args
868 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870 def test_read1(self):
871 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
872 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"a", bufio.read(1))
874 self.assertEqual(b"b", bufio.read1(1))
875 self.assertEqual(rawio._reads, 1)
876 self.assertEqual(b"c", bufio.read1(100))
877 self.assertEqual(rawio._reads, 1)
878 self.assertEqual(b"d", bufio.read1(100))
879 self.assertEqual(rawio._reads, 2)
880 self.assertEqual(b"efg", bufio.read1(100))
881 self.assertEqual(rawio._reads, 3)
882 self.assertEqual(b"", bufio.read1(100))
883 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 # Invalid args
885 self.assertRaises(ValueError, bufio.read1, -1)
886
887 def test_readinto(self):
888 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
889 bufio = self.tp(rawio)
890 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000891 self.assertEqual(bufio.readinto(b), 2)
892 self.assertEqual(b, b"ab")
893 self.assertEqual(bufio.readinto(b), 2)
894 self.assertEqual(b, b"cd")
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ef")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"gf")
899 self.assertEqual(bufio.readinto(b), 0)
900 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200901 rawio = self.MockRawIO((b"abc", None))
902 bufio = self.tp(rawio)
903 self.assertEqual(bufio.readinto(b), 2)
904 self.assertEqual(b, b"ab")
905 self.assertEqual(bufio.readinto(b), 1)
906 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000908 def test_readlines(self):
909 def bufio():
910 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
911 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000912 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
913 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
914 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000917 data = b"abcdefghi"
918 dlen = len(data)
919
920 tests = [
921 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
922 [ 100, [ 3, 3, 3], [ dlen ] ],
923 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
924 ]
925
926 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 rawio = self.MockFileIO(data)
928 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos = 0
930 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000932 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000934 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000937 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
939 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000940 self.assertEqual(b"abcd", bufio.read(6))
941 self.assertEqual(b"e", bufio.read(1))
942 self.assertEqual(b"fg", bufio.read())
943 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000945 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000946
Victor Stinnera80987f2011-05-25 22:47:16 +0200947 rawio = self.MockRawIO((b"a", None, None))
948 self.assertEqual(b"a", rawio.readall())
949 self.assertIsNone(rawio.readall())
950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951 def test_read_past_eof(self):
952 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
953 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Ezio Melottib3aedd42010-11-20 19:04:17 +0000955 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read_all(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Victor Stinner45df8202010-04-28 22:31:17 +0000963 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000964 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000966 try:
967 # Write out many bytes with exactly the same number of 0's,
968 # 1's... 255's. This will help us check that concurrent reading
969 # doesn't duplicate or forget contents.
970 N = 1000
971 l = list(range(256)) * N
972 random.shuffle(l)
973 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000974 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000976 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000978 errors = []
979 results = []
980 def f():
981 try:
982 # Intra-buffer read then buffer-flushing read
983 for n in cycle([1, 19]):
984 s = bufio.read(n)
985 if not s:
986 break
987 # list.append() is atomic
988 results.append(s)
989 except Exception as e:
990 errors.append(e)
991 raise
992 threads = [threading.Thread(target=f) for x in range(20)]
993 for t in threads:
994 t.start()
995 time.sleep(0.02) # yield
996 for t in threads:
997 t.join()
998 self.assertFalse(errors,
999 "the following exceptions were caught: %r" % errors)
1000 s = b''.join(results)
1001 for i in range(256):
1002 c = bytes(bytearray([i]))
1003 self.assertEqual(s.count(c), N)
1004 finally:
1005 support.unlink(support.TESTFN)
1006
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001007 def test_unseekable(self):
1008 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1009 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1010 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1011 bufio.read(1)
1012 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1013 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_misbehaved_io(self):
1016 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1017 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001018 self.assertRaises(OSError, bufio.seek, 0)
1019 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001021 def test_no_extraneous_read(self):
1022 # Issue #9550; when the raw IO object has satisfied the read request,
1023 # we should not issue any additional reads, otherwise it may block
1024 # (e.g. socket).
1025 bufsize = 16
1026 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1027 rawio = self.MockRawIO([b"x" * n])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 # Simple case: one raw read is enough to satisfy the request.
1031 self.assertEqual(rawio._extraneous_reads, 0,
1032 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1033 # A more complex case where two raw reads are needed to satisfy
1034 # the request.
1035 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1036 bufio = self.tp(rawio, bufsize)
1037 self.assertEqual(bufio.read(n), b"x" * n)
1038 self.assertEqual(rawio._extraneous_reads, 0,
1039 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1040
1041
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001042class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 tp = io.BufferedReader
1044
1045 def test_constructor(self):
1046 BufferedReaderTest.test_constructor(self)
1047 # The allocation can succeed on 32-bit builds, e.g. with more
1048 # than 2GB RAM and a 64-bit kernel.
1049 if sys.maxsize > 0x7FFFFFFF:
1050 rawio = self.MockRawIO()
1051 bufio = self.tp(rawio)
1052 self.assertRaises((OverflowError, MemoryError, ValueError),
1053 bufio.__init__, rawio, sys.maxsize)
1054
1055 def test_initialization(self):
1056 rawio = self.MockRawIO([b"abc"])
1057 bufio = self.tp(rawio)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1059 self.assertRaises(ValueError, bufio.read)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1061 self.assertRaises(ValueError, bufio.read)
1062 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1063 self.assertRaises(ValueError, bufio.read)
1064
1065 def test_misbehaved_io_read(self):
1066 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1067 bufio = self.tp(rawio)
1068 # _pyio.BufferedReader seems to implement reading different, so that
1069 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
1072 def test_garbage_collection(self):
1073 # C BufferedReader objects are collected.
1074 # The Python version has __del__, so it ends into gc.garbage instead
1075 rawio = self.FileIO(support.TESTFN, "w+b")
1076 f = self.tp(rawio)
1077 f.f = f
1078 wr = weakref.ref(f)
1079 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001080 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082
R David Murray67bfe802013-02-23 21:51:05 -05001083 def test_args_error(self):
1084 # Issue #17275
1085 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1086 self.tp(io.BytesIO(), 1024, 1024, 1024)
1087
1088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089class PyBufferedReaderTest(BufferedReaderTest):
1090 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001091
Guido van Rossuma9e20242007-03-08 00:43:48 +00001092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1094 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_constructor(self):
1097 rawio = self.MockRawIO()
1098 bufio = self.tp(rawio)
1099 bufio.__init__(rawio)
1100 bufio.__init__(rawio, buffer_size=1024)
1101 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 bufio.flush()
1104 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1105 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1106 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1107 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001110 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001112 def test_detach_flush(self):
1113 raw = self.MockRawIO()
1114 buf = self.tp(raw)
1115 buf.write(b"howdy!")
1116 self.assertFalse(raw._write_stack)
1117 buf.detach()
1118 self.assertEqual(raw._write_stack, [b"howdy!"])
1119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001121 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001124 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001125 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 def test_write_overflow(self):
1128 writer = self.MockRawIO()
1129 bufio = self.tp(writer, 8)
1130 contents = b"abcdefghijklmnop"
1131 for n in range(0, len(contents), 3):
1132 bufio.write(contents[n:n+3])
1133 flushed = b"".join(writer._write_stack)
1134 # At least (total - 8) bytes were implicitly flushed, perhaps more
1135 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001136 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 def check_writes(self, intermediate_func):
1139 # Lots of writes, test the flushed output is as expected.
1140 contents = bytes(range(256)) * 1000
1141 n = 0
1142 writer = self.MockRawIO()
1143 bufio = self.tp(writer, 13)
1144 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1145 def gen_sizes():
1146 for size in count(1):
1147 for i in range(15):
1148 yield size
1149 sizes = gen_sizes()
1150 while n < len(contents):
1151 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 intermediate_func(bufio)
1154 n += size
1155 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001156 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001157
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158 def test_writes(self):
1159 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_writes_and_flushes(self):
1162 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_writes_and_seeks(self):
1165 def _seekabs(bufio):
1166 pos = bufio.tell()
1167 bufio.seek(pos + 1, 0)
1168 bufio.seek(pos - 1, 0)
1169 bufio.seek(pos, 0)
1170 self.check_writes(_seekabs)
1171 def _seekrel(bufio):
1172 pos = bufio.seek(0, 1)
1173 bufio.seek(+1, 1)
1174 bufio.seek(-1, 1)
1175 bufio.seek(pos, 0)
1176 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 def test_writes_and_truncates(self):
1179 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 def test_write_non_blocking(self):
1182 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001183 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001184
Ezio Melottib3aedd42010-11-20 19:04:17 +00001185 self.assertEqual(bufio.write(b"abcd"), 4)
1186 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 # 1 byte will be written, the rest will be buffered
1188 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001189 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001191 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1192 raw.block_on(b"0")
1193 try:
1194 bufio.write(b"opqrwxyz0123456789")
1195 except self.BlockingIOError as e:
1196 written = e.characters_written
1197 else:
1198 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001199 self.assertEqual(written, 16)
1200 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001202
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 s = raw.pop_written()
1205 # Previously buffered bytes were flushed
1206 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 def test_write_and_rewind(self):
1209 raw = io.BytesIO()
1210 bufio = self.tp(raw, 4)
1211 self.assertEqual(bufio.write(b"abcdef"), 6)
1212 self.assertEqual(bufio.tell(), 6)
1213 bufio.seek(0, 0)
1214 self.assertEqual(bufio.write(b"XY"), 2)
1215 bufio.seek(6, 0)
1216 self.assertEqual(raw.getvalue(), b"XYcdef")
1217 self.assertEqual(bufio.write(b"123456"), 6)
1218 bufio.flush()
1219 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 def test_flush(self):
1222 writer = self.MockRawIO()
1223 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001224 bufio.write(b"abc")
1225 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001226 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001227
Antoine Pitrou131a4892012-10-16 22:57:11 +02001228 def test_writelines(self):
1229 l = [b'ab', b'cd', b'ef']
1230 writer = self.MockRawIO()
1231 bufio = self.tp(writer, 8)
1232 bufio.writelines(l)
1233 bufio.flush()
1234 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1235
1236 def test_writelines_userlist(self):
1237 l = UserList([b'ab', b'cd', b'ef'])
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 bufio.writelines(l)
1241 bufio.flush()
1242 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1243
1244 def test_writelines_error(self):
1245 writer = self.MockRawIO()
1246 bufio = self.tp(writer, 8)
1247 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1248 self.assertRaises(TypeError, bufio.writelines, None)
1249 self.assertRaises(TypeError, bufio.writelines, 'abc')
1250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 def test_destructor(self):
1252 writer = self.MockRawIO()
1253 bufio = self.tp(writer, 8)
1254 bufio.write(b"abc")
1255 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001256 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001257 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258
1259 def test_truncate(self):
1260 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001261 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001262 bufio = self.tp(raw, 8)
1263 bufio.write(b"abcdef")
1264 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001265 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001266 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 self.assertEqual(f.read(), b"abc")
1268
Victor Stinner45df8202010-04-28 22:31:17 +00001269 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001270 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001272 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 # Write out many bytes from many threads and test they were
1274 # all flushed.
1275 N = 1000
1276 contents = bytes(range(256)) * N
1277 sizes = cycle([1, 19])
1278 n = 0
1279 queue = deque()
1280 while n < len(contents):
1281 size = next(sizes)
1282 queue.append(contents[n:n+size])
1283 n += size
1284 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001285 # We use a real file object because it allows us to
1286 # exercise situations where the GIL is released before
1287 # writing the buffer to the raw streams. This is in addition
1288 # to concurrency issues due to switching threads in the middle
1289 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001290 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001291 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001292 errors = []
1293 def f():
1294 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001295 while True:
1296 try:
1297 s = queue.popleft()
1298 except IndexError:
1299 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001300 bufio.write(s)
1301 except Exception as e:
1302 errors.append(e)
1303 raise
1304 threads = [threading.Thread(target=f) for x in range(20)]
1305 for t in threads:
1306 t.start()
1307 time.sleep(0.02) # yield
1308 for t in threads:
1309 t.join()
1310 self.assertFalse(errors,
1311 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001313 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314 s = f.read()
1315 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001316 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001317 finally:
1318 support.unlink(support.TESTFN)
1319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def test_misbehaved_io(self):
1321 rawio = self.MisbehavedRawIO()
1322 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001323 self.assertRaises(OSError, bufio.seek, 0)
1324 self.assertRaises(OSError, bufio.tell)
1325 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001326
Florent Xicluna109d5732012-07-07 17:03:22 +02001327 def test_max_buffer_size_removal(self):
1328 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001329 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001330
Benjamin Peterson68623612012-12-20 11:53:11 -06001331 def test_write_error_on_close(self):
1332 raw = self.MockRawIO()
1333 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001334 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001335 raw.write = bad_write
1336 b = self.tp(raw)
1337 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001338 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001339 self.assertTrue(b.closed)
1340
Benjamin Peterson59406a92009-03-26 17:10:29 +00001341
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001342class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343 tp = io.BufferedWriter
1344
1345 def test_constructor(self):
1346 BufferedWriterTest.test_constructor(self)
1347 # The allocation can succeed on 32-bit builds, e.g. with more
1348 # than 2GB RAM and a 64-bit kernel.
1349 if sys.maxsize > 0x7FFFFFFF:
1350 rawio = self.MockRawIO()
1351 bufio = self.tp(rawio)
1352 self.assertRaises((OverflowError, MemoryError, ValueError),
1353 bufio.__init__, rawio, sys.maxsize)
1354
1355 def test_initialization(self):
1356 rawio = self.MockRawIO()
1357 bufio = self.tp(rawio)
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1361 self.assertRaises(ValueError, bufio.write, b"def")
1362 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1363 self.assertRaises(ValueError, bufio.write, b"def")
1364
1365 def test_garbage_collection(self):
1366 # C BufferedWriter objects are collected, and collecting them flushes
1367 # all data to disk.
1368 # The Python version has __del__, so it ends into gc.garbage instead
1369 rawio = self.FileIO(support.TESTFN, "w+b")
1370 f = self.tp(rawio)
1371 f.write(b"123xxx")
1372 f.x = f
1373 wr = weakref.ref(f)
1374 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001375 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001376 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001377 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378 self.assertEqual(f.read(), b"123xxx")
1379
R David Murray67bfe802013-02-23 21:51:05 -05001380 def test_args_error(self):
1381 # Issue #17275
1382 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1383 self.tp(io.BytesIO(), 1024, 1024, 1024)
1384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385
1386class PyBufferedWriterTest(BufferedWriterTest):
1387 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001388
Guido van Rossum01a27522007-03-07 01:00:12 +00001389class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001390
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001391 def test_constructor(self):
1392 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001393 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001394
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001395 def test_detach(self):
1396 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1397 self.assertRaises(self.UnsupportedOperation, pair.detach)
1398
Florent Xicluna109d5732012-07-07 17:03:22 +02001399 def test_constructor_max_buffer_size_removal(self):
1400 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001401 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001402
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001403 def test_constructor_with_not_readable(self):
1404 class NotReadable(MockRawIO):
1405 def readable(self):
1406 return False
1407
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001408 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001409
1410 def test_constructor_with_not_writeable(self):
1411 class NotWriteable(MockRawIO):
1412 def writable(self):
1413 return False
1414
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001415 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001416
1417 def test_read(self):
1418 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1419
1420 self.assertEqual(pair.read(3), b"abc")
1421 self.assertEqual(pair.read(1), b"d")
1422 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001423 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1424 self.assertEqual(pair.read(None), b"abc")
1425
1426 def test_readlines(self):
1427 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1428 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1429 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1430 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001431
1432 def test_read1(self):
1433 # .read1() is delegated to the underlying reader object, so this test
1434 # can be shallow.
1435 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1436
1437 self.assertEqual(pair.read1(3), b"abc")
1438
1439 def test_readinto(self):
1440 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1441
1442 data = bytearray(5)
1443 self.assertEqual(pair.readinto(data), 5)
1444 self.assertEqual(data, b"abcde")
1445
1446 def test_write(self):
1447 w = self.MockRawIO()
1448 pair = self.tp(self.MockRawIO(), w)
1449
1450 pair.write(b"abc")
1451 pair.flush()
1452 pair.write(b"def")
1453 pair.flush()
1454 self.assertEqual(w._write_stack, [b"abc", b"def"])
1455
1456 def test_peek(self):
1457 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1458
1459 self.assertTrue(pair.peek(3).startswith(b"abc"))
1460 self.assertEqual(pair.read(3), b"abc")
1461
1462 def test_readable(self):
1463 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1464 self.assertTrue(pair.readable())
1465
1466 def test_writeable(self):
1467 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1468 self.assertTrue(pair.writable())
1469
1470 def test_seekable(self):
1471 # BufferedRWPairs are never seekable, even if their readers and writers
1472 # are.
1473 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1474 self.assertFalse(pair.seekable())
1475
1476 # .flush() is delegated to the underlying writer object and has been
1477 # tested in the test_write method.
1478
1479 def test_close_and_closed(self):
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertFalse(pair.closed)
1482 pair.close()
1483 self.assertTrue(pair.closed)
1484
1485 def test_isatty(self):
1486 class SelectableIsAtty(MockRawIO):
1487 def __init__(self, isatty):
1488 MockRawIO.__init__(self)
1489 self._isatty = isatty
1490
1491 def isatty(self):
1492 return self._isatty
1493
1494 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1495 self.assertFalse(pair.isatty())
1496
1497 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1498 self.assertTrue(pair.isatty())
1499
1500 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1501 self.assertTrue(pair.isatty())
1502
1503 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1504 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001506class CBufferedRWPairTest(BufferedRWPairTest):
1507 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509class PyBufferedRWPairTest(BufferedRWPairTest):
1510 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512
1513class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1514 read_mode = "rb+"
1515 write_mode = "wb+"
1516
1517 def test_constructor(self):
1518 BufferedReaderTest.test_constructor(self)
1519 BufferedWriterTest.test_constructor(self)
1520
1521 def test_read_and_write(self):
1522 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001523 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001524
1525 self.assertEqual(b"as", rw.read(2))
1526 rw.write(b"ddd")
1527 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001528 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001530 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001532 def test_seek_and_tell(self):
1533 raw = self.BytesIO(b"asdfghjkl")
1534 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001535
Ezio Melottib3aedd42010-11-20 19:04:17 +00001536 self.assertEqual(b"as", rw.read(2))
1537 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001538 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001539 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001540
Antoine Pitroue05565e2011-08-20 14:39:23 +02001541 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001542 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001543 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001545 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001546 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001547 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(7, rw.tell())
1549 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001550 rw.flush()
1551 self.assertEqual(b"asdf123fl", raw.getvalue())
1552
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001553 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 def check_flush_and_read(self, read_func):
1556 raw = self.BytesIO(b"abcdefghi")
1557 bufio = self.tp(raw)
1558
Ezio Melottib3aedd42010-11-20 19:04:17 +00001559 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(b"ef", read_func(bufio, 2))
1562 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001564 self.assertEqual(6, bufio.tell())
1565 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001566 raw.seek(0, 0)
1567 raw.write(b"XYZ")
1568 # flush() resets the read buffer
1569 bufio.flush()
1570 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001571 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001572
1573 def test_flush_and_read(self):
1574 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1575
1576 def test_flush_and_readinto(self):
1577 def _readinto(bufio, n=-1):
1578 b = bytearray(n if n >= 0 else 9999)
1579 n = bufio.readinto(b)
1580 return bytes(b[:n])
1581 self.check_flush_and_read(_readinto)
1582
1583 def test_flush_and_peek(self):
1584 def _peek(bufio, n=-1):
1585 # This relies on the fact that the buffer can contain the whole
1586 # raw stream, otherwise peek() can return less.
1587 b = bufio.peek(n)
1588 if n != -1:
1589 b = b[:n]
1590 bufio.seek(len(b), 1)
1591 return b
1592 self.check_flush_and_read(_peek)
1593
1594 def test_flush_and_write(self):
1595 raw = self.BytesIO(b"abcdefghi")
1596 bufio = self.tp(raw)
1597
1598 bufio.write(b"123")
1599 bufio.flush()
1600 bufio.write(b"45")
1601 bufio.flush()
1602 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001603 self.assertEqual(b"12345fghi", raw.getvalue())
1604 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605
1606 def test_threads(self):
1607 BufferedReaderTest.test_threads(self)
1608 BufferedWriterTest.test_threads(self)
1609
1610 def test_writes_and_peek(self):
1611 def _peek(bufio):
1612 bufio.peek(1)
1613 self.check_writes(_peek)
1614 def _peek(bufio):
1615 pos = bufio.tell()
1616 bufio.seek(-1, 1)
1617 bufio.peek(1)
1618 bufio.seek(pos, 0)
1619 self.check_writes(_peek)
1620
1621 def test_writes_and_reads(self):
1622 def _read(bufio):
1623 bufio.seek(-1, 1)
1624 bufio.read(1)
1625 self.check_writes(_read)
1626
1627 def test_writes_and_read1s(self):
1628 def _read1(bufio):
1629 bufio.seek(-1, 1)
1630 bufio.read1(1)
1631 self.check_writes(_read1)
1632
1633 def test_writes_and_readintos(self):
1634 def _read(bufio):
1635 bufio.seek(-1, 1)
1636 bufio.readinto(bytearray(1))
1637 self.check_writes(_read)
1638
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001639 def test_write_after_readahead(self):
1640 # Issue #6629: writing after the buffer was filled by readahead should
1641 # first rewind the raw stream.
1642 for overwrite_size in [1, 5]:
1643 raw = self.BytesIO(b"A" * 10)
1644 bufio = self.tp(raw, 4)
1645 # Trigger readahead
1646 self.assertEqual(bufio.read(1), b"A")
1647 self.assertEqual(bufio.tell(), 1)
1648 # Overwriting should rewind the raw stream if it needs so
1649 bufio.write(b"B" * overwrite_size)
1650 self.assertEqual(bufio.tell(), overwrite_size + 1)
1651 # If the write size was smaller than the buffer size, flush() and
1652 # check that rewind happens.
1653 bufio.flush()
1654 self.assertEqual(bufio.tell(), overwrite_size + 1)
1655 s = raw.getvalue()
1656 self.assertEqual(s,
1657 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1658
Antoine Pitrou7c404892011-05-13 00:13:33 +02001659 def test_write_rewind_write(self):
1660 # Various combinations of reading / writing / seeking backwards / writing again
1661 def mutate(bufio, pos1, pos2):
1662 assert pos2 >= pos1
1663 # Fill the buffer
1664 bufio.seek(pos1)
1665 bufio.read(pos2 - pos1)
1666 bufio.write(b'\x02')
1667 # This writes earlier than the previous write, but still inside
1668 # the buffer.
1669 bufio.seek(pos1)
1670 bufio.write(b'\x01')
1671
1672 b = b"\x80\x81\x82\x83\x84"
1673 for i in range(0, len(b)):
1674 for j in range(i, len(b)):
1675 raw = self.BytesIO(b)
1676 bufio = self.tp(raw, 100)
1677 mutate(bufio, i, j)
1678 bufio.flush()
1679 expected = bytearray(b)
1680 expected[j] = 2
1681 expected[i] = 1
1682 self.assertEqual(raw.getvalue(), expected,
1683 "failed result for i=%d, j=%d" % (i, j))
1684
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001685 def test_truncate_after_read_or_write(self):
1686 raw = self.BytesIO(b"A" * 10)
1687 bufio = self.tp(raw, 100)
1688 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1689 self.assertEqual(bufio.truncate(), 2)
1690 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1691 self.assertEqual(bufio.truncate(), 4)
1692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 def test_misbehaved_io(self):
1694 BufferedReaderTest.test_misbehaved_io(self)
1695 BufferedWriterTest.test_misbehaved_io(self)
1696
Antoine Pitroue05565e2011-08-20 14:39:23 +02001697 def test_interleaved_read_write(self):
1698 # Test for issue #12213
1699 with self.BytesIO(b'abcdefgh') as raw:
1700 with self.tp(raw, 100) as f:
1701 f.write(b"1")
1702 self.assertEqual(f.read(1), b'b')
1703 f.write(b'2')
1704 self.assertEqual(f.read1(1), b'd')
1705 f.write(b'3')
1706 buf = bytearray(1)
1707 f.readinto(buf)
1708 self.assertEqual(buf, b'f')
1709 f.write(b'4')
1710 self.assertEqual(f.peek(1), b'h')
1711 f.flush()
1712 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1713
1714 with self.BytesIO(b'abc') as raw:
1715 with self.tp(raw, 100) as f:
1716 self.assertEqual(f.read(1), b'a')
1717 f.write(b"2")
1718 self.assertEqual(f.read(1), b'c')
1719 f.flush()
1720 self.assertEqual(raw.getvalue(), b'a2c')
1721
1722 def test_interleaved_readline_write(self):
1723 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1724 with self.tp(raw) as f:
1725 f.write(b'1')
1726 self.assertEqual(f.readline(), b'b\n')
1727 f.write(b'2')
1728 self.assertEqual(f.readline(), b'def\n')
1729 f.write(b'3')
1730 self.assertEqual(f.readline(), b'\n')
1731 f.flush()
1732 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1733
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001734 # You can't construct a BufferedRandom over a non-seekable stream.
1735 test_unseekable = None
1736
R David Murray67bfe802013-02-23 21:51:05 -05001737
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001738class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 tp = io.BufferedRandom
1740
1741 def test_constructor(self):
1742 BufferedRandomTest.test_constructor(self)
1743 # The allocation can succeed on 32-bit builds, e.g. with more
1744 # than 2GB RAM and a 64-bit kernel.
1745 if sys.maxsize > 0x7FFFFFFF:
1746 rawio = self.MockRawIO()
1747 bufio = self.tp(rawio)
1748 self.assertRaises((OverflowError, MemoryError, ValueError),
1749 bufio.__init__, rawio, sys.maxsize)
1750
1751 def test_garbage_collection(self):
1752 CBufferedReaderTest.test_garbage_collection(self)
1753 CBufferedWriterTest.test_garbage_collection(self)
1754
R David Murray67bfe802013-02-23 21:51:05 -05001755 def test_args_error(self):
1756 # Issue #17275
1757 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1758 self.tp(io.BytesIO(), 1024, 1024, 1024)
1759
1760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761class PyBufferedRandomTest(BufferedRandomTest):
1762 tp = pyio.BufferedRandom
1763
1764
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001765# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1766# properties:
1767# - A single output character can correspond to many bytes of input.
1768# - The number of input bytes to complete the character can be
1769# undetermined until the last input byte is received.
1770# - The number of input bytes can vary depending on previous input.
1771# - A single input byte can correspond to many characters of output.
1772# - The number of output characters can be undetermined until the
1773# last input byte is received.
1774# - The number of output characters can vary depending on previous input.
1775
1776class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1777 """
1778 For testing seek/tell behavior with a stateful, buffering decoder.
1779
1780 Input is a sequence of words. Words may be fixed-length (length set
1781 by input) or variable-length (period-terminated). In variable-length
1782 mode, extra periods are ignored. Possible words are:
1783 - 'i' followed by a number sets the input length, I (maximum 99).
1784 When I is set to 0, words are space-terminated.
1785 - 'o' followed by a number sets the output length, O (maximum 99).
1786 - Any other word is converted into a word followed by a period on
1787 the output. The output word consists of the input word truncated
1788 or padded out with hyphens to make its length equal to O. If O
1789 is 0, the word is output verbatim without truncating or padding.
1790 I and O are initially set to 1. When I changes, any buffered input is
1791 re-scanned according to the new I. EOF also terminates the last word.
1792 """
1793
1794 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001795 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001796 self.reset()
1797
1798 def __repr__(self):
1799 return '<SID %x>' % id(self)
1800
1801 def reset(self):
1802 self.i = 1
1803 self.o = 1
1804 self.buffer = bytearray()
1805
1806 def getstate(self):
1807 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1808 return bytes(self.buffer), i*100 + o
1809
1810 def setstate(self, state):
1811 buffer, io = state
1812 self.buffer = bytearray(buffer)
1813 i, o = divmod(io, 100)
1814 self.i, self.o = i ^ 1, o ^ 1
1815
1816 def decode(self, input, final=False):
1817 output = ''
1818 for b in input:
1819 if self.i == 0: # variable-length, terminated with period
1820 if b == ord('.'):
1821 if self.buffer:
1822 output += self.process_word()
1823 else:
1824 self.buffer.append(b)
1825 else: # fixed-length, terminate after self.i bytes
1826 self.buffer.append(b)
1827 if len(self.buffer) == self.i:
1828 output += self.process_word()
1829 if final and self.buffer: # EOF terminates the last word
1830 output += self.process_word()
1831 return output
1832
1833 def process_word(self):
1834 output = ''
1835 if self.buffer[0] == ord('i'):
1836 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1837 elif self.buffer[0] == ord('o'):
1838 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1839 else:
1840 output = self.buffer.decode('ascii')
1841 if len(output) < self.o:
1842 output += '-'*self.o # pad out with hyphens
1843 if self.o:
1844 output = output[:self.o] # truncate to output length
1845 output += '.'
1846 self.buffer = bytearray()
1847 return output
1848
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001849 codecEnabled = False
1850
1851 @classmethod
1852 def lookupTestDecoder(cls, name):
1853 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001854 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001855 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001856 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001857 incrementalencoder=None,
1858 streamreader=None, streamwriter=None,
1859 incrementaldecoder=cls)
1860
1861# Register the previous decoder for testing.
1862# Disabled by default, tests will enable it.
1863codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1864
1865
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001866class StatefulIncrementalDecoderTest(unittest.TestCase):
1867 """
1868 Make sure the StatefulIncrementalDecoder actually works.
1869 """
1870
1871 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001872 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001873 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001874 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001875 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001876 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001877 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001878 # I=0, O=6 (variable-length input, fixed-length output)
1879 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1880 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001881 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001882 # I=6, O=3 (fixed-length input > fixed-length output)
1883 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1884 # I=0, then 3; O=29, then 15 (with longer output)
1885 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1886 'a----------------------------.' +
1887 'b----------------------------.' +
1888 'cde--------------------------.' +
1889 'abcdefghijabcde.' +
1890 'a.b------------.' +
1891 '.c.------------.' +
1892 'd.e------------.' +
1893 'k--------------.' +
1894 'l--------------.' +
1895 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001896 ]
1897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001899 # Try a few one-shot test cases.
1900 for input, eof, output in self.test_cases:
1901 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001902 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001903
1904 # Also test an unfinished decode, followed by forcing EOF.
1905 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001906 self.assertEqual(d.decode(b'oiabcd'), '')
1907 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001908
1909class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001911 def setUp(self):
1912 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1913 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001914 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001915
Guido van Rossumd0712812007-04-11 16:32:43 +00001916 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001917 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001919 def test_constructor(self):
1920 r = self.BytesIO(b"\xc3\xa9\n\n")
1921 b = self.BufferedReader(r, 1000)
1922 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001923 t.__init__(b, encoding="latin-1", newline="\r\n")
1924 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001925 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001926 t.__init__(b, encoding="utf-8", line_buffering=True)
1927 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001928 self.assertEqual(t.line_buffering, True)
1929 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 self.assertRaises(TypeError, t.__init__, b, newline=42)
1931 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1932
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001933 def test_detach(self):
1934 r = self.BytesIO()
1935 b = self.BufferedWriter(r)
1936 t = self.TextIOWrapper(b)
1937 self.assertIs(t.detach(), b)
1938
1939 t = self.TextIOWrapper(b, encoding="ascii")
1940 t.write("howdy")
1941 self.assertFalse(r.getvalue())
1942 t.detach()
1943 self.assertEqual(r.getvalue(), b"howdy")
1944 self.assertRaises(ValueError, t.detach)
1945
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001946 def test_repr(self):
1947 raw = self.BytesIO("hello".encode("utf-8"))
1948 b = self.BufferedReader(raw)
1949 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001950 modname = self.TextIOWrapper.__module__
1951 self.assertEqual(repr(t),
1952 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1953 raw.name = "dummy"
1954 self.assertEqual(repr(t),
1955 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001956 t.mode = "r"
1957 self.assertEqual(repr(t),
1958 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001959 raw.name = b"dummy"
1960 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001961 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 def test_line_buffering(self):
1964 r = self.BytesIO()
1965 b = self.BufferedWriter(r, 1000)
1966 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001967 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001968 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001969 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001970 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001971 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001972 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001973
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001974 def test_default_encoding(self):
1975 old_environ = dict(os.environ)
1976 try:
1977 # try to get a user preferred encoding different than the current
1978 # locale encoding to check that TextIOWrapper() uses the current
1979 # locale encoding and not the user preferred encoding
1980 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1981 if key in os.environ:
1982 del os.environ[key]
1983
1984 current_locale_encoding = locale.getpreferredencoding(False)
1985 b = self.BytesIO()
1986 t = self.TextIOWrapper(b)
1987 self.assertEqual(t.encoding, current_locale_encoding)
1988 finally:
1989 os.environ.clear()
1990 os.environ.update(old_environ)
1991
Serhiy Storchaka78980432013-01-15 01:12:17 +02001992 # Issue 15989
1993 def test_device_encoding(self):
1994 b = self.BytesIO()
1995 b.fileno = lambda: _testcapi.INT_MAX + 1
1996 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1997 b.fileno = lambda: _testcapi.UINT_MAX + 1
1998 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 def test_encoding(self):
2001 # Check the encoding attribute is always set, and valid
2002 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002003 t = self.TextIOWrapper(b, encoding="utf-8")
2004 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002006 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 codecs.lookup(t.encoding)
2008
2009 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002010 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO(b"abc\n\xff\n")
2012 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002013 self.assertRaises(UnicodeError, t.read)
2014 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 b = self.BytesIO(b"abc\n\xff\n")
2016 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002017 self.assertRaises(UnicodeError, t.read)
2018 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 b = self.BytesIO(b"abc\n\xff\n")
2020 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002021 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002022 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002023 b = self.BytesIO(b"abc\n\xff\n")
2024 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002025 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002028 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 b = self.BytesIO()
2030 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002031 self.assertRaises(UnicodeError, t.write, "\xff")
2032 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 b = self.BytesIO()
2034 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002035 self.assertRaises(UnicodeError, t.write, "\xff")
2036 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 b = self.BytesIO()
2038 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002039 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002040 t.write("abc\xffdef\n")
2041 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002042 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002043 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044 b = self.BytesIO()
2045 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002046 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002047 t.write("abc\xffdef\n")
2048 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002049 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002051 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002052 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2053
2054 tests = [
2055 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002056 [ '', input_lines ],
2057 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2058 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2059 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002061 encodings = (
2062 'utf-8', 'latin-1',
2063 'utf-16', 'utf-16-le', 'utf-16-be',
2064 'utf-32', 'utf-32-le', 'utf-32-be',
2065 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002066
Guido van Rossum8358db22007-08-18 21:39:55 +00002067 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002068 # character in TextIOWrapper._pending_line.
2069 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002070 # XXX: str.encode() should return bytes
2071 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002072 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002073 for bufsize in range(1, 10):
2074 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2076 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002077 encoding=encoding)
2078 if do_reads:
2079 got_lines = []
2080 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002081 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002082 if c2 == '':
2083 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002084 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002085 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002086 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002087 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002088
2089 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(got_line, exp_line)
2091 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093 def test_newlines_input(self):
2094 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002095 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2096 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002097 (None, normalized.decode("ascii").splitlines(keepends=True)),
2098 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2100 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2101 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002102 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 buf = self.BytesIO(testdata)
2104 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002106 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002107 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002108
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 def test_newlines_output(self):
2110 testdict = {
2111 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2112 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2113 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2114 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2115 }
2116 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2117 for newline, expected in tests:
2118 buf = self.BytesIO()
2119 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2120 txt.write("AAA\nB")
2121 txt.write("BB\nCCC\n")
2122 txt.write("X\rY\r\nZ")
2123 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(buf.closed, False)
2125 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126
2127 def test_destructor(self):
2128 l = []
2129 base = self.BytesIO
2130 class MyBytesIO(base):
2131 def close(self):
2132 l.append(self.getvalue())
2133 base.close(self)
2134 b = MyBytesIO()
2135 t = self.TextIOWrapper(b, encoding="ascii")
2136 t.write("abc")
2137 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002138 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002139 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140
2141 def test_override_destructor(self):
2142 record = []
2143 class MyTextIO(self.TextIOWrapper):
2144 def __del__(self):
2145 record.append(1)
2146 try:
2147 f = super().__del__
2148 except AttributeError:
2149 pass
2150 else:
2151 f()
2152 def close(self):
2153 record.append(2)
2154 super().close()
2155 def flush(self):
2156 record.append(3)
2157 super().flush()
2158 b = self.BytesIO()
2159 t = MyTextIO(b, encoding="ascii")
2160 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002161 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 self.assertEqual(record, [1, 2, 3])
2163
2164 def test_error_through_destructor(self):
2165 # Test that the exception state is not modified by a destructor,
2166 # even if close() fails.
2167 rawio = self.CloseFailureIO()
2168 def f():
2169 self.TextIOWrapper(rawio).xyzzy
2170 with support.captured_output("stderr") as s:
2171 self.assertRaises(AttributeError, f)
2172 s = s.getvalue().strip()
2173 if s:
2174 # The destructor *may* have printed an unraisable error, check it
2175 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002176 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002177 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002178
Guido van Rossum9b76da62007-04-11 01:09:03 +00002179 # Systematic tests of the text I/O API
2180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002182 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002183 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002185 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002187 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002189 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(f.tell(), 0)
2191 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002192 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(f.seek(0), 0)
2194 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002195 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002196 self.assertEqual(f.read(2), "ab")
2197 self.assertEqual(f.read(1), "c")
2198 self.assertEqual(f.read(1), "")
2199 self.assertEqual(f.read(), "")
2200 self.assertEqual(f.tell(), cookie)
2201 self.assertEqual(f.seek(0), 0)
2202 self.assertEqual(f.seek(0, 2), cookie)
2203 self.assertEqual(f.write("def"), 3)
2204 self.assertEqual(f.seek(cookie), cookie)
2205 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002206 if enc.startswith("utf"):
2207 self.multi_line_test(f, enc)
2208 f.close()
2209
2210 def multi_line_test(self, f, enc):
2211 f.seek(0)
2212 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002213 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002214 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002215 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002216 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002217 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002218 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002219 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002220 wlines.append((f.tell(), line))
2221 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002222 f.seek(0)
2223 rlines = []
2224 while True:
2225 pos = f.tell()
2226 line = f.readline()
2227 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002228 break
2229 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002233 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002234 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002235 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002236 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002237 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002238 p2 = f.tell()
2239 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(f.tell(), p0)
2241 self.assertEqual(f.readline(), "\xff\n")
2242 self.assertEqual(f.tell(), p1)
2243 self.assertEqual(f.readline(), "\xff\n")
2244 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002245 f.seek(0)
2246 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002248 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002250 f.close()
2251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252 def test_seeking(self):
2253 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002254 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002255 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002256 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002258 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002259 suffix = bytes(u_suffix.encode("utf-8"))
2260 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002261 with self.open(support.TESTFN, "wb") as f:
2262 f.write(line*2)
2263 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2264 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(s, str(prefix, "ascii"))
2266 self.assertEqual(f.tell(), prefix_size)
2267 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002270 # Regression test for a specific bug
2271 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002272 with self.open(support.TESTFN, "wb") as f:
2273 f.write(data)
2274 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2275 f._CHUNK_SIZE # Just test that it exists
2276 f._CHUNK_SIZE = 2
2277 f.readline()
2278 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002279
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 def test_seek_and_tell(self):
2281 #Test seek/tell using the StatefulIncrementalDecoder.
2282 # Make test faster by doing smaller seeks
2283 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002284
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002285 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002286 """Tell/seek to various points within a data stream and ensure
2287 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002289 f.write(data)
2290 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 f = self.open(support.TESTFN, encoding='test_decoder')
2292 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002293 decoded = f.read()
2294 f.close()
2295
Neal Norwitze2b07052008-03-18 19:52:05 +00002296 for i in range(min_pos, len(decoded) + 1): # seek positions
2297 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002298 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002299 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002300 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002302 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002303 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002304 f.close()
2305
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002306 # Enable the test decoder.
2307 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002308
2309 # Run the tests.
2310 try:
2311 # Try each test case.
2312 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002313 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002314
2315 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002316 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2317 offset = CHUNK_SIZE - len(input)//2
2318 prefix = b'.'*offset
2319 # Don't bother seeking into the prefix (takes too long).
2320 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002321 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002322
2323 # Ensure our test decoder won't interfere with subsequent tests.
2324 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002325 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002328 data = "1234567890"
2329 tests = ("utf-16",
2330 "utf-16-le",
2331 "utf-16-be",
2332 "utf-32",
2333 "utf-32-le",
2334 "utf-32-be")
2335 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 buf = self.BytesIO()
2337 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002338 # Check if the BOM is written only once (see issue1753).
2339 f.write(data)
2340 f.write(data)
2341 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002342 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002343 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002344 self.assertEqual(f.read(), data * 2)
2345 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002346
Benjamin Petersona1b49012009-03-31 23:11:32 +00002347 def test_unreadable(self):
2348 class UnReadable(self.BytesIO):
2349 def readable(self):
2350 return False
2351 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002352 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 def test_read_one_by_one(self):
2355 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002356 reads = ""
2357 while True:
2358 c = txt.read(1)
2359 if not c:
2360 break
2361 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002363
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002364 def test_readlines(self):
2365 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2366 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2367 txt.seek(0)
2368 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2369 txt.seek(0)
2370 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2371
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002372 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002373 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002374 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002376 reads = ""
2377 while True:
2378 c = txt.read(128)
2379 if not c:
2380 break
2381 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002382 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002384 def test_writelines(self):
2385 l = ['ab', 'cd', 'ef']
2386 buf = self.BytesIO()
2387 txt = self.TextIOWrapper(buf)
2388 txt.writelines(l)
2389 txt.flush()
2390 self.assertEqual(buf.getvalue(), b'abcdef')
2391
2392 def test_writelines_userlist(self):
2393 l = UserList(['ab', 'cd', 'ef'])
2394 buf = self.BytesIO()
2395 txt = self.TextIOWrapper(buf)
2396 txt.writelines(l)
2397 txt.flush()
2398 self.assertEqual(buf.getvalue(), b'abcdef')
2399
2400 def test_writelines_error(self):
2401 txt = self.TextIOWrapper(self.BytesIO())
2402 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2403 self.assertRaises(TypeError, txt.writelines, None)
2404 self.assertRaises(TypeError, txt.writelines, b'abc')
2405
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002406 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002408
2409 # read one char at a time
2410 reads = ""
2411 while True:
2412 c = txt.read(1)
2413 if not c:
2414 break
2415 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002417
2418 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002420 txt._CHUNK_SIZE = 4
2421
2422 reads = ""
2423 while True:
2424 c = txt.read(4)
2425 if not c:
2426 break
2427 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002428 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002429
2430 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002431 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002432 txt._CHUNK_SIZE = 4
2433
2434 reads = txt.read(4)
2435 reads += txt.read(4)
2436 reads += txt.readline()
2437 reads += txt.readline()
2438 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002440
2441 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002443 txt._CHUNK_SIZE = 4
2444
2445 reads = txt.read(4)
2446 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002448
2449 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002451 txt._CHUNK_SIZE = 4
2452
2453 reads = txt.read(4)
2454 pos = txt.tell()
2455 txt.seek(0)
2456 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002457 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002458
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002459 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 buffer = self.BytesIO(self.testdata)
2461 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002462
2463 self.assertEqual(buffer.seekable(), txt.seekable())
2464
Antoine Pitroue4501852009-05-14 18:55:55 +00002465 def test_append_bom(self):
2466 # The BOM is not written again when appending to a non-empty file
2467 filename = support.TESTFN
2468 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2469 with self.open(filename, 'w', encoding=charset) as f:
2470 f.write('aaa')
2471 pos = f.tell()
2472 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002473 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002474
2475 with self.open(filename, 'a', encoding=charset) as f:
2476 f.write('xxx')
2477 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002479
2480 def test_seek_bom(self):
2481 # Same test, but when seeking manually
2482 filename = support.TESTFN
2483 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2484 with self.open(filename, 'w', encoding=charset) as f:
2485 f.write('aaa')
2486 pos = f.tell()
2487 with self.open(filename, 'r+', encoding=charset) as f:
2488 f.seek(pos)
2489 f.write('zzz')
2490 f.seek(0)
2491 f.write('bbb')
2492 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002493 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002494
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002495 def test_errors_property(self):
2496 with self.open(support.TESTFN, "w") as f:
2497 self.assertEqual(f.errors, "strict")
2498 with self.open(support.TESTFN, "w", errors="replace") as f:
2499 self.assertEqual(f.errors, "replace")
2500
Brett Cannon31f59292011-02-21 19:29:56 +00002501 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002502 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002503 def test_threads_write(self):
2504 # Issue6750: concurrent writes could duplicate data
2505 event = threading.Event()
2506 with self.open(support.TESTFN, "w", buffering=1) as f:
2507 def run(n):
2508 text = "Thread%03d\n" % n
2509 event.wait()
2510 f.write(text)
2511 threads = [threading.Thread(target=lambda n=x: run(n))
2512 for x in range(20)]
2513 for t in threads:
2514 t.start()
2515 time.sleep(0.02)
2516 event.set()
2517 for t in threads:
2518 t.join()
2519 with self.open(support.TESTFN) as f:
2520 content = f.read()
2521 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002523
Antoine Pitrou6be88762010-05-03 16:48:20 +00002524 def test_flush_error_on_close(self):
2525 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2526 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002527 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002528 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002529 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002530 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002531
2532 def test_multi_close(self):
2533 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2534 txt.close()
2535 txt.close()
2536 txt.close()
2537 self.assertRaises(ValueError, txt.flush)
2538
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002539 def test_unseekable(self):
2540 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2541 self.assertRaises(self.UnsupportedOperation, txt.tell)
2542 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2543
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002544 def test_readonly_attributes(self):
2545 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2546 buf = self.BytesIO(self.testdata)
2547 with self.assertRaises(AttributeError):
2548 txt.buffer = buf
2549
Antoine Pitroue96ec682011-07-23 21:46:35 +02002550 def test_rawio(self):
2551 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2552 # that subprocess.Popen() can have the required unbuffered
2553 # semantics with universal_newlines=True.
2554 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2555 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2556 # Reads
2557 self.assertEqual(txt.read(4), 'abcd')
2558 self.assertEqual(txt.readline(), 'efghi\n')
2559 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2560
2561 def test_rawio_write_through(self):
2562 # Issue #12591: with write_through=True, writes don't need a flush
2563 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2564 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2565 write_through=True)
2566 txt.write('1')
2567 txt.write('23\n4')
2568 txt.write('5')
2569 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2570
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002571 def test_read_nonbytes(self):
2572 # Issue #17106
2573 # Crash when underlying read() returns non-bytes
2574 t = self.TextIOWrapper(self.StringIO('a'))
2575 self.assertRaises(TypeError, t.read, 1)
2576 t = self.TextIOWrapper(self.StringIO('a'))
2577 self.assertRaises(TypeError, t.readline)
2578 t = self.TextIOWrapper(self.StringIO('a'))
2579 self.assertRaises(TypeError, t.read)
2580
2581 def test_illegal_decoder(self):
2582 # Issue #17106
2583 # Crash when decoder returns non-string
2584 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2585 encoding='quopri_codec')
2586 self.assertRaises(TypeError, t.read, 1)
2587 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2588 encoding='quopri_codec')
2589 self.assertRaises(TypeError, t.readline)
2590 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2591 encoding='quopri_codec')
2592 self.assertRaises(TypeError, t.read)
2593
2594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595class CTextIOWrapperTest(TextIOWrapperTest):
2596
2597 def test_initialization(self):
2598 r = self.BytesIO(b"\xc3\xa9\n\n")
2599 b = self.BufferedReader(r, 1000)
2600 t = self.TextIOWrapper(b)
2601 self.assertRaises(TypeError, t.__init__, b, newline=42)
2602 self.assertRaises(ValueError, t.read)
2603 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2604 self.assertRaises(ValueError, t.read)
2605
2606 def test_garbage_collection(self):
2607 # C TextIOWrapper objects are collected, and collecting them flushes
2608 # all data to disk.
2609 # The Python version has __del__, so it ends in gc.garbage instead.
2610 rawio = io.FileIO(support.TESTFN, "wb")
2611 b = self.BufferedWriter(rawio)
2612 t = self.TextIOWrapper(b, encoding="ascii")
2613 t.write("456def")
2614 t.x = t
2615 wr = weakref.ref(t)
2616 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002617 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002618 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002619 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002620 self.assertEqual(f.read(), b"456def")
2621
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002622 def test_rwpair_cleared_before_textio(self):
2623 # Issue 13070: TextIOWrapper's finalization would crash when called
2624 # after the reference to the underlying BufferedRWPair's writer got
2625 # cleared by the GC.
2626 for i in range(1000):
2627 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2628 t1 = self.TextIOWrapper(b1, encoding="ascii")
2629 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2630 t2 = self.TextIOWrapper(b2, encoding="ascii")
2631 # circular references
2632 t1.buddy = t2
2633 t2.buddy = t1
2634 support.gc_collect()
2635
2636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637class PyTextIOWrapperTest(TextIOWrapperTest):
2638 pass
2639
2640
2641class IncrementalNewlineDecoderTest(unittest.TestCase):
2642
2643 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002644 # UTF-8 specific tests for a newline decoder
2645 def _check_decode(b, s, **kwargs):
2646 # We exercise getstate() / setstate() as well as decode()
2647 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002649 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002651
Antoine Pitrou180a3362008-12-14 16:36:46 +00002652 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002653
Antoine Pitrou180a3362008-12-14 16:36:46 +00002654 _check_decode(b'\xe8', "")
2655 _check_decode(b'\xa2', "")
2656 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002657
Antoine Pitrou180a3362008-12-14 16:36:46 +00002658 _check_decode(b'\xe8', "")
2659 _check_decode(b'\xa2', "")
2660 _check_decode(b'\x88', "\u8888")
2661
2662 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002663 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2664
Antoine Pitrou180a3362008-12-14 16:36:46 +00002665 decoder.reset()
2666 _check_decode(b'\n', "\n")
2667 _check_decode(b'\r', "")
2668 _check_decode(b'', "\n", final=True)
2669 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002670
Antoine Pitrou180a3362008-12-14 16:36:46 +00002671 _check_decode(b'\r', "")
2672 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002673
Antoine Pitrou180a3362008-12-14 16:36:46 +00002674 _check_decode(b'\r\r\n', "\n\n")
2675 _check_decode(b'\r', "")
2676 _check_decode(b'\r', "\n")
2677 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002678
Antoine Pitrou180a3362008-12-14 16:36:46 +00002679 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2680 _check_decode(b'\xe8\xa2\x88', "\u8888")
2681 _check_decode(b'\n', "\n")
2682 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2683 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002686 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002687 if encoding is not None:
2688 encoder = codecs.getincrementalencoder(encoding)()
2689 def _decode_bytewise(s):
2690 # Decode one byte at a time
2691 for b in encoder.encode(s):
2692 result.append(decoder.decode(bytes([b])))
2693 else:
2694 encoder = None
2695 def _decode_bytewise(s):
2696 # Decode one char at a time
2697 for c in s:
2698 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002699 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002700 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002701 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002702 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002704 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002705 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002706 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002707 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002708 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002709 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002710 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 input = "abc"
2712 if encoder is not None:
2713 encoder.reset()
2714 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(decoder.decode(input), "abc")
2716 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002717
2718 def test_newline_decoder(self):
2719 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002720 # None meaning the IncrementalNewlineDecoder takes unicode input
2721 # rather than bytes input
2722 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002723 'utf-16', 'utf-16-le', 'utf-16-be',
2724 'utf-32', 'utf-32-le', 'utf-32-be',
2725 )
2726 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 decoder = enc and codecs.getincrementaldecoder(enc)()
2728 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2729 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002730 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2732 self.check_newline_decoding_utf8(decoder)
2733
Antoine Pitrou66913e22009-03-06 23:40:56 +00002734 def test_newline_bytes(self):
2735 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2736 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(dec.newlines, None)
2738 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2739 self.assertEqual(dec.newlines, None)
2740 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2741 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002742 dec = self.IncrementalNewlineDecoder(None, translate=False)
2743 _check(dec)
2744 dec = self.IncrementalNewlineDecoder(None, translate=True)
2745 _check(dec)
2746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2748 pass
2749
2750class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2751 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002752
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002753
Guido van Rossum01a27522007-03-07 01:00:12 +00002754# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002755
Guido van Rossum5abbf752007-08-27 17:39:33 +00002756class MiscIOTest(unittest.TestCase):
2757
Barry Warsaw40e82462008-11-20 20:14:50 +00002758 def tearDown(self):
2759 support.unlink(support.TESTFN)
2760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 def test___all__(self):
2762 for name in self.io.__all__:
2763 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002764 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002765 if name == "open":
2766 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002767 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002768 self.assertTrue(issubclass(obj, Exception), name)
2769 elif not name.startswith("SEEK_"):
2770 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002771
Barry Warsaw40e82462008-11-20 20:14:50 +00002772 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002774 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002775 f.close()
2776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002777 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(f.name, support.TESTFN)
2779 self.assertEqual(f.buffer.name, support.TESTFN)
2780 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2781 self.assertEqual(f.mode, "U")
2782 self.assertEqual(f.buffer.mode, "rb")
2783 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002784 f.close()
2785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002787 self.assertEqual(f.mode, "w+")
2788 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2789 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002792 self.assertEqual(g.mode, "wb")
2793 self.assertEqual(g.raw.mode, "wb")
2794 self.assertEqual(g.name, f.fileno())
2795 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002796 f.close()
2797 g.close()
2798
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002799 def test_io_after_close(self):
2800 for kwargs in [
2801 {"mode": "w"},
2802 {"mode": "wb"},
2803 {"mode": "w", "buffering": 1},
2804 {"mode": "w", "buffering": 2},
2805 {"mode": "wb", "buffering": 0},
2806 {"mode": "r"},
2807 {"mode": "rb"},
2808 {"mode": "r", "buffering": 1},
2809 {"mode": "r", "buffering": 2},
2810 {"mode": "rb", "buffering": 0},
2811 {"mode": "w+"},
2812 {"mode": "w+b"},
2813 {"mode": "w+", "buffering": 1},
2814 {"mode": "w+", "buffering": 2},
2815 {"mode": "w+b", "buffering": 0},
2816 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002818 f.close()
2819 self.assertRaises(ValueError, f.flush)
2820 self.assertRaises(ValueError, f.fileno)
2821 self.assertRaises(ValueError, f.isatty)
2822 self.assertRaises(ValueError, f.__iter__)
2823 if hasattr(f, "peek"):
2824 self.assertRaises(ValueError, f.peek, 1)
2825 self.assertRaises(ValueError, f.read)
2826 if hasattr(f, "read1"):
2827 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002828 if hasattr(f, "readall"):
2829 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002830 if hasattr(f, "readinto"):
2831 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2832 self.assertRaises(ValueError, f.readline)
2833 self.assertRaises(ValueError, f.readlines)
2834 self.assertRaises(ValueError, f.seek, 0)
2835 self.assertRaises(ValueError, f.tell)
2836 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002837 self.assertRaises(ValueError, f.write,
2838 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002839 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 def test_blockingioerror(self):
2843 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002844 class C(str):
2845 pass
2846 c = C("")
2847 b = self.BlockingIOError(1, c)
2848 c.b = b
2849 b.c = c
2850 wr = weakref.ref(c)
2851 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002852 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002853 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002854
2855 def test_abcs(self):
2856 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002857 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2858 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2859 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2860 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002861
2862 def _check_abc_inheritance(self, abcmodule):
2863 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002864 self.assertIsInstance(f, abcmodule.IOBase)
2865 self.assertIsInstance(f, abcmodule.RawIOBase)
2866 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2867 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002869 self.assertIsInstance(f, abcmodule.IOBase)
2870 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2871 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2872 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002873 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002874 self.assertIsInstance(f, abcmodule.IOBase)
2875 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2876 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2877 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002878
2879 def test_abc_inheritance(self):
2880 # Test implementations inherit from their respective ABCs
2881 self._check_abc_inheritance(self)
2882
2883 def test_abc_inheritance_official(self):
2884 # Test implementations inherit from the official ABCs of the
2885 # baseline "io" module.
2886 self._check_abc_inheritance(io)
2887
Antoine Pitroue033e062010-10-29 10:38:18 +00002888 def _check_warn_on_dealloc(self, *args, **kwargs):
2889 f = open(*args, **kwargs)
2890 r = repr(f)
2891 with self.assertWarns(ResourceWarning) as cm:
2892 f = None
2893 support.gc_collect()
2894 self.assertIn(r, str(cm.warning.args[0]))
2895
2896 def test_warn_on_dealloc(self):
2897 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2898 self._check_warn_on_dealloc(support.TESTFN, "wb")
2899 self._check_warn_on_dealloc(support.TESTFN, "w")
2900
2901 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2902 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002903 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002904 for fd in fds:
2905 try:
2906 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002907 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002908 if e.errno != errno.EBADF:
2909 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002910 self.addCleanup(cleanup_fds)
2911 r, w = os.pipe()
2912 fds += r, w
2913 self._check_warn_on_dealloc(r, *args, **kwargs)
2914 # When using closefd=False, there's no warning
2915 r, w = os.pipe()
2916 fds += r, w
2917 with warnings.catch_warnings(record=True) as recorded:
2918 open(r, *args, closefd=False, **kwargs)
2919 support.gc_collect()
2920 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002921
2922 def test_warn_on_dealloc_fd(self):
2923 self._check_warn_on_dealloc_fd("rb", buffering=0)
2924 self._check_warn_on_dealloc_fd("rb")
2925 self._check_warn_on_dealloc_fd("r")
2926
2927
Antoine Pitrou243757e2010-11-05 21:15:39 +00002928 def test_pickling(self):
2929 # Pickling file objects is forbidden
2930 for kwargs in [
2931 {"mode": "w"},
2932 {"mode": "wb"},
2933 {"mode": "wb", "buffering": 0},
2934 {"mode": "r"},
2935 {"mode": "rb"},
2936 {"mode": "rb", "buffering": 0},
2937 {"mode": "w+"},
2938 {"mode": "w+b"},
2939 {"mode": "w+b", "buffering": 0},
2940 ]:
2941 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2942 with self.open(support.TESTFN, **kwargs) as f:
2943 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2944
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002945 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2946 def test_nonblock_pipe_write_bigbuf(self):
2947 self._test_nonblock_pipe_write(16*1024)
2948
2949 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2950 def test_nonblock_pipe_write_smallbuf(self):
2951 self._test_nonblock_pipe_write(1024)
2952
2953 def _set_non_blocking(self, fd):
2954 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2955 self.assertNotEqual(flags, -1)
2956 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2957 self.assertEqual(res, 0)
2958
2959 def _test_nonblock_pipe_write(self, bufsize):
2960 sent = []
2961 received = []
2962 r, w = os.pipe()
2963 self._set_non_blocking(r)
2964 self._set_non_blocking(w)
2965
2966 # To exercise all code paths in the C implementation we need
2967 # to play with buffer sizes. For instance, if we choose a
2968 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2969 # then we will never get a partial write of the buffer.
2970 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2971 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2972
2973 with rf, wf:
2974 for N in 9999, 73, 7574:
2975 try:
2976 i = 0
2977 while True:
2978 msg = bytes([i % 26 + 97]) * N
2979 sent.append(msg)
2980 wf.write(msg)
2981 i += 1
2982
2983 except self.BlockingIOError as e:
2984 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002985 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002986 sent[-1] = sent[-1][:e.characters_written]
2987 received.append(rf.read())
2988 msg = b'BLOCKED'
2989 wf.write(msg)
2990 sent.append(msg)
2991
2992 while True:
2993 try:
2994 wf.flush()
2995 break
2996 except self.BlockingIOError as e:
2997 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002998 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002999 self.assertEqual(e.characters_written, 0)
3000 received.append(rf.read())
3001
3002 received += iter(rf.read, None)
3003
3004 sent, received = b''.join(sent), b''.join(received)
3005 self.assertTrue(sent == received)
3006 self.assertTrue(wf.closed)
3007 self.assertTrue(rf.closed)
3008
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003009 def test_create_fail(self):
3010 # 'x' mode fails if file is existing
3011 with self.open(support.TESTFN, 'w'):
3012 pass
3013 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3014
3015 def test_create_writes(self):
3016 # 'x' mode opens for writing
3017 with self.open(support.TESTFN, 'xb') as f:
3018 f.write(b"spam")
3019 with self.open(support.TESTFN, 'rb') as f:
3020 self.assertEqual(b"spam", f.read())
3021
Christian Heimes7b648752012-09-10 14:48:43 +02003022 def test_open_allargs(self):
3023 # there used to be a buffer overflow in the parser for rawmode
3024 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3025
3026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003027class CMiscIOTest(MiscIOTest):
3028 io = io
3029
3030class PyMiscIOTest(MiscIOTest):
3031 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003032
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003033
3034@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3035class SignalsTest(unittest.TestCase):
3036
3037 def setUp(self):
3038 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3039
3040 def tearDown(self):
3041 signal.signal(signal.SIGALRM, self.oldalrm)
3042
3043 def alarm_interrupt(self, sig, frame):
3044 1/0
3045
3046 @unittest.skipUnless(threading, 'Threading required for this test.')
3047 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3048 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003049 invokes the signal handler, and bubbles up the exception raised
3050 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003051 read_results = []
3052 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003053 if hasattr(signal, 'pthread_sigmask'):
3054 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003055 s = os.read(r, 1)
3056 read_results.append(s)
3057 t = threading.Thread(target=_read)
3058 t.daemon = True
3059 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003060 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003061 try:
3062 wio = self.io.open(w, **fdopen_kwargs)
3063 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003064 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003065 # Fill the pipe enough that the write will be blocking.
3066 # It will be interrupted by the timer armed above. Since the
3067 # other thread has read one byte, the low-level write will
3068 # return with a successful (partial) result rather than an EINTR.
3069 # The buffered IO layer must check for pending signal
3070 # handlers, which in this case will invoke alarm_interrupt().
3071 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003072 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003073 t.join()
3074 # We got one byte, get another one and check that it isn't a
3075 # repeat of the first one.
3076 read_results.append(os.read(r, 1))
3077 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3078 finally:
3079 os.close(w)
3080 os.close(r)
3081 # This is deliberate. If we didn't close the file descriptor
3082 # before closing wio, wio would try to flush its internal
3083 # buffer, and block again.
3084 try:
3085 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003086 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003087 if e.errno != errno.EBADF:
3088 raise
3089
3090 def test_interrupted_write_unbuffered(self):
3091 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3092
3093 def test_interrupted_write_buffered(self):
3094 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3095
3096 def test_interrupted_write_text(self):
3097 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3098
Brett Cannon31f59292011-02-21 19:29:56 +00003099 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003100 def check_reentrant_write(self, data, **fdopen_kwargs):
3101 def on_alarm(*args):
3102 # Will be called reentrantly from the same thread
3103 wio.write(data)
3104 1/0
3105 signal.signal(signal.SIGALRM, on_alarm)
3106 r, w = os.pipe()
3107 wio = self.io.open(w, **fdopen_kwargs)
3108 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003109 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003110 # Either the reentrant call to wio.write() fails with RuntimeError,
3111 # or the signal handler raises ZeroDivisionError.
3112 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3113 while 1:
3114 for i in range(100):
3115 wio.write(data)
3116 wio.flush()
3117 # Make sure the buffer doesn't fill up and block further writes
3118 os.read(r, len(data) * 100)
3119 exc = cm.exception
3120 if isinstance(exc, RuntimeError):
3121 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3122 finally:
3123 wio.close()
3124 os.close(r)
3125
3126 def test_reentrant_write_buffered(self):
3127 self.check_reentrant_write(b"xy", mode="wb")
3128
3129 def test_reentrant_write_text(self):
3130 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3131
Antoine Pitrou707ce822011-02-25 21:24:11 +00003132 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3133 """Check that a buffered read, when it gets interrupted (either
3134 returning a partial result or EINTR), properly invokes the signal
3135 handler and retries if the latter returned successfully."""
3136 r, w = os.pipe()
3137 fdopen_kwargs["closefd"] = False
3138 def alarm_handler(sig, frame):
3139 os.write(w, b"bar")
3140 signal.signal(signal.SIGALRM, alarm_handler)
3141 try:
3142 rio = self.io.open(r, **fdopen_kwargs)
3143 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003144 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003145 # Expected behaviour:
3146 # - first raw read() returns partial b"foo"
3147 # - second raw read() returns EINTR
3148 # - third raw read() returns b"bar"
3149 self.assertEqual(decode(rio.read(6)), "foobar")
3150 finally:
3151 rio.close()
3152 os.close(w)
3153 os.close(r)
3154
Antoine Pitrou20db5112011-08-19 20:32:34 +02003155 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003156 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3157 mode="rb")
3158
Antoine Pitrou20db5112011-08-19 20:32:34 +02003159 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003160 self.check_interrupted_read_retry(lambda x: x,
3161 mode="r")
3162
3163 @unittest.skipUnless(threading, 'Threading required for this test.')
3164 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3165 """Check that a buffered write, when it gets interrupted (either
3166 returning a partial result or EINTR), properly invokes the signal
3167 handler and retries if the latter returned successfully."""
3168 select = support.import_module("select")
3169 # A quantity that exceeds the buffer size of an anonymous pipe's
3170 # write end.
3171 N = 1024 * 1024
3172 r, w = os.pipe()
3173 fdopen_kwargs["closefd"] = False
3174 # We need a separate thread to read from the pipe and allow the
3175 # write() to finish. This thread is started after the SIGALRM is
3176 # received (forcing a first EINTR in write()).
3177 read_results = []
3178 write_finished = False
3179 def _read():
3180 while not write_finished:
3181 while r in select.select([r], [], [], 1.0)[0]:
3182 s = os.read(r, 1024)
3183 read_results.append(s)
3184 t = threading.Thread(target=_read)
3185 t.daemon = True
3186 def alarm1(sig, frame):
3187 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003188 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003189 def alarm2(sig, frame):
3190 t.start()
3191 signal.signal(signal.SIGALRM, alarm1)
3192 try:
3193 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003194 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003195 # Expected behaviour:
3196 # - first raw write() is partial (because of the limited pipe buffer
3197 # and the first alarm)
3198 # - second raw write() returns EINTR (because of the second alarm)
3199 # - subsequent write()s are successful (either partial or complete)
3200 self.assertEqual(N, wio.write(item * N))
3201 wio.flush()
3202 write_finished = True
3203 t.join()
3204 self.assertEqual(N, sum(len(x) for x in read_results))
3205 finally:
3206 write_finished = True
3207 os.close(w)
3208 os.close(r)
3209 # This is deliberate. If we didn't close the file descriptor
3210 # before closing wio, wio would try to flush its internal
3211 # buffer, and could block (in case of failure).
3212 try:
3213 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003214 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003215 if e.errno != errno.EBADF:
3216 raise
3217
Antoine Pitrou20db5112011-08-19 20:32:34 +02003218 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003219 self.check_interrupted_write_retry(b"x", mode="wb")
3220
Antoine Pitrou20db5112011-08-19 20:32:34 +02003221 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003222 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3223
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003224
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003225class CSignalsTest(SignalsTest):
3226 io = io
3227
3228class PySignalsTest(SignalsTest):
3229 io = pyio
3230
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003231 # Handling reentrancy issues would slow down _pyio even more, so the
3232 # tests are disabled.
3233 test_reentrant_write_buffered = None
3234 test_reentrant_write_text = None
3235
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003236
Ezio Melottidaa42c72013-03-23 16:30:16 +02003237def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003238 tests = (CIOTest, PyIOTest,
3239 CBufferedReaderTest, PyBufferedReaderTest,
3240 CBufferedWriterTest, PyBufferedWriterTest,
3241 CBufferedRWPairTest, PyBufferedRWPairTest,
3242 CBufferedRandomTest, PyBufferedRandomTest,
3243 StatefulIncrementalDecoderTest,
3244 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3245 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003246 CMiscIOTest, PyMiscIOTest,
3247 CSignalsTest, PySignalsTest,
3248 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003249
3250 # Put the namespaces of the IO module we are testing and some useful mock
3251 # classes in the __dict__ of each test.
3252 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003253 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003254 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3255 c_io_ns = {name : getattr(io, name) for name in all_members}
3256 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3257 globs = globals()
3258 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3259 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3260 # Avoid turning open into a bound method.
3261 py_io_ns["open"] = pyio.OpenWrapper
3262 for test in tests:
3263 if test.__name__.startswith("C"):
3264 for name, obj in c_io_ns.items():
3265 setattr(test, name, obj)
3266 elif test.__name__.startswith("Py"):
3267 for name, obj in py_io_ns.items():
3268 setattr(test, name, obj)
3269
Ezio Melottidaa42c72013-03-23 16:30:16 +02003270 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3271 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003272
3273if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003274 unittest.main()