blob: 8727ddeaf184a0a7aa4e515445e5a1e2f78c131c [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka78980432013-01-15 01:12:17 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200604 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000605 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200606 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200765 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000782 raw.flush = bad_flush
783 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200784 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
Jesus Ceadc469452012-10-04 12:37:56 +0200836 @support.cpython_only
837 def test_buffer_freeing(self) :
838 bufsize = 4096
839 rawio = self.MockRawIO()
840 bufio = self.tp(rawio, buffer_size=bufsize)
841 size = sys.getsizeof(bufio) - bufsize
842 bufio.close()
843 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
846 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 def test_constructor(self):
849 rawio = self.MockRawIO([b"abc"])
850 bufio = self.tp(rawio)
851 bufio.__init__(rawio)
852 bufio.__init__(rawio, buffer_size=1024)
853 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000854 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
858 rawio = self.MockRawIO([b"abc"])
859 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000860 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000863 for arg in (None, 7):
864 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
865 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000866 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 # Invalid args
868 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870 def test_read1(self):
871 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
872 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"a", bufio.read(1))
874 self.assertEqual(b"b", bufio.read1(1))
875 self.assertEqual(rawio._reads, 1)
876 self.assertEqual(b"c", bufio.read1(100))
877 self.assertEqual(rawio._reads, 1)
878 self.assertEqual(b"d", bufio.read1(100))
879 self.assertEqual(rawio._reads, 2)
880 self.assertEqual(b"efg", bufio.read1(100))
881 self.assertEqual(rawio._reads, 3)
882 self.assertEqual(b"", bufio.read1(100))
883 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 # Invalid args
885 self.assertRaises(ValueError, bufio.read1, -1)
886
887 def test_readinto(self):
888 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
889 bufio = self.tp(rawio)
890 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000891 self.assertEqual(bufio.readinto(b), 2)
892 self.assertEqual(b, b"ab")
893 self.assertEqual(bufio.readinto(b), 2)
894 self.assertEqual(b, b"cd")
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ef")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"gf")
899 self.assertEqual(bufio.readinto(b), 0)
900 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200901 rawio = self.MockRawIO((b"abc", None))
902 bufio = self.tp(rawio)
903 self.assertEqual(bufio.readinto(b), 2)
904 self.assertEqual(b, b"ab")
905 self.assertEqual(bufio.readinto(b), 1)
906 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000908 def test_readlines(self):
909 def bufio():
910 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
911 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000912 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
913 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
914 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000917 data = b"abcdefghi"
918 dlen = len(data)
919
920 tests = [
921 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
922 [ 100, [ 3, 3, 3], [ dlen ] ],
923 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
924 ]
925
926 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 rawio = self.MockFileIO(data)
928 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos = 0
930 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000932 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000934 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000937 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
939 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000940 self.assertEqual(b"abcd", bufio.read(6))
941 self.assertEqual(b"e", bufio.read(1))
942 self.assertEqual(b"fg", bufio.read())
943 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000945 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000946
Victor Stinnera80987f2011-05-25 22:47:16 +0200947 rawio = self.MockRawIO((b"a", None, None))
948 self.assertEqual(b"a", rawio.readall())
949 self.assertIsNone(rawio.readall())
950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951 def test_read_past_eof(self):
952 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
953 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Ezio Melottib3aedd42010-11-20 19:04:17 +0000955 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read_all(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Victor Stinner45df8202010-04-28 22:31:17 +0000963 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000964 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000966 try:
967 # Write out many bytes with exactly the same number of 0's,
968 # 1's... 255's. This will help us check that concurrent reading
969 # doesn't duplicate or forget contents.
970 N = 1000
971 l = list(range(256)) * N
972 random.shuffle(l)
973 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000974 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000976 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000978 errors = []
979 results = []
980 def f():
981 try:
982 # Intra-buffer read then buffer-flushing read
983 for n in cycle([1, 19]):
984 s = bufio.read(n)
985 if not s:
986 break
987 # list.append() is atomic
988 results.append(s)
989 except Exception as e:
990 errors.append(e)
991 raise
992 threads = [threading.Thread(target=f) for x in range(20)]
993 for t in threads:
994 t.start()
995 time.sleep(0.02) # yield
996 for t in threads:
997 t.join()
998 self.assertFalse(errors,
999 "the following exceptions were caught: %r" % errors)
1000 s = b''.join(results)
1001 for i in range(256):
1002 c = bytes(bytearray([i]))
1003 self.assertEqual(s.count(c), N)
1004 finally:
1005 support.unlink(support.TESTFN)
1006
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001007 def test_unseekable(self):
1008 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1009 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1010 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1011 bufio.read(1)
1012 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1013 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_misbehaved_io(self):
1016 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1017 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001018 self.assertRaises(OSError, bufio.seek, 0)
1019 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001021 def test_no_extraneous_read(self):
1022 # Issue #9550; when the raw IO object has satisfied the read request,
1023 # we should not issue any additional reads, otherwise it may block
1024 # (e.g. socket).
1025 bufsize = 16
1026 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1027 rawio = self.MockRawIO([b"x" * n])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 # Simple case: one raw read is enough to satisfy the request.
1031 self.assertEqual(rawio._extraneous_reads, 0,
1032 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1033 # A more complex case where two raw reads are needed to satisfy
1034 # the request.
1035 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1036 bufio = self.tp(rawio, bufsize)
1037 self.assertEqual(bufio.read(n), b"x" * n)
1038 self.assertEqual(rawio._extraneous_reads, 0,
1039 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1040
1041
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001042class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 tp = io.BufferedReader
1044
1045 def test_constructor(self):
1046 BufferedReaderTest.test_constructor(self)
1047 # The allocation can succeed on 32-bit builds, e.g. with more
1048 # than 2GB RAM and a 64-bit kernel.
1049 if sys.maxsize > 0x7FFFFFFF:
1050 rawio = self.MockRawIO()
1051 bufio = self.tp(rawio)
1052 self.assertRaises((OverflowError, MemoryError, ValueError),
1053 bufio.__init__, rawio, sys.maxsize)
1054
1055 def test_initialization(self):
1056 rawio = self.MockRawIO([b"abc"])
1057 bufio = self.tp(rawio)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1059 self.assertRaises(ValueError, bufio.read)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1061 self.assertRaises(ValueError, bufio.read)
1062 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1063 self.assertRaises(ValueError, bufio.read)
1064
1065 def test_misbehaved_io_read(self):
1066 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1067 bufio = self.tp(rawio)
1068 # _pyio.BufferedReader seems to implement reading different, so that
1069 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
1072 def test_garbage_collection(self):
1073 # C BufferedReader objects are collected.
1074 # The Python version has __del__, so it ends into gc.garbage instead
1075 rawio = self.FileIO(support.TESTFN, "w+b")
1076 f = self.tp(rawio)
1077 f.f = f
1078 wr = weakref.ref(f)
1079 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001080 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082
1083class PyBufferedReaderTest(BufferedReaderTest):
1084 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001085
Guido van Rossuma9e20242007-03-08 00:43:48 +00001086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1088 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def test_constructor(self):
1091 rawio = self.MockRawIO()
1092 bufio = self.tp(rawio)
1093 bufio.__init__(rawio)
1094 bufio.__init__(rawio, buffer_size=1024)
1095 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 bufio.flush()
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1099 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1100 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1101 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001104 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001106 def test_detach_flush(self):
1107 raw = self.MockRawIO()
1108 buf = self.tp(raw)
1109 buf.write(b"howdy!")
1110 self.assertFalse(raw._write_stack)
1111 buf.detach()
1112 self.assertEqual(raw._write_stack, [b"howdy!"])
1113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001115 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001118 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001119 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_write_overflow(self):
1122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
1124 contents = b"abcdefghijklmnop"
1125 for n in range(0, len(contents), 3):
1126 bufio.write(contents[n:n+3])
1127 flushed = b"".join(writer._write_stack)
1128 # At least (total - 8) bytes were implicitly flushed, perhaps more
1129 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001130 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 def check_writes(self, intermediate_func):
1133 # Lots of writes, test the flushed output is as expected.
1134 contents = bytes(range(256)) * 1000
1135 n = 0
1136 writer = self.MockRawIO()
1137 bufio = self.tp(writer, 13)
1138 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1139 def gen_sizes():
1140 for size in count(1):
1141 for i in range(15):
1142 yield size
1143 sizes = gen_sizes()
1144 while n < len(contents):
1145 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 intermediate_func(bufio)
1148 n += size
1149 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001150 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152 def test_writes(self):
1153 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 def test_writes_and_flushes(self):
1156 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001157
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158 def test_writes_and_seeks(self):
1159 def _seekabs(bufio):
1160 pos = bufio.tell()
1161 bufio.seek(pos + 1, 0)
1162 bufio.seek(pos - 1, 0)
1163 bufio.seek(pos, 0)
1164 self.check_writes(_seekabs)
1165 def _seekrel(bufio):
1166 pos = bufio.seek(0, 1)
1167 bufio.seek(+1, 1)
1168 bufio.seek(-1, 1)
1169 bufio.seek(pos, 0)
1170 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 def test_writes_and_truncates(self):
1173 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_write_non_blocking(self):
1176 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001177 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(bufio.write(b"abcd"), 4)
1180 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 # 1 byte will be written, the rest will be buffered
1182 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001183 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1186 raw.block_on(b"0")
1187 try:
1188 bufio.write(b"opqrwxyz0123456789")
1189 except self.BlockingIOError as e:
1190 written = e.characters_written
1191 else:
1192 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001193 self.assertEqual(written, 16)
1194 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001196
Ezio Melottib3aedd42010-11-20 19:04:17 +00001197 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 s = raw.pop_written()
1199 # Previously buffered bytes were flushed
1200 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001201
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 def test_write_and_rewind(self):
1203 raw = io.BytesIO()
1204 bufio = self.tp(raw, 4)
1205 self.assertEqual(bufio.write(b"abcdef"), 6)
1206 self.assertEqual(bufio.tell(), 6)
1207 bufio.seek(0, 0)
1208 self.assertEqual(bufio.write(b"XY"), 2)
1209 bufio.seek(6, 0)
1210 self.assertEqual(raw.getvalue(), b"XYcdef")
1211 self.assertEqual(bufio.write(b"123456"), 6)
1212 bufio.flush()
1213 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_flush(self):
1216 writer = self.MockRawIO()
1217 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001218 bufio.write(b"abc")
1219 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001221
Antoine Pitrou131a4892012-10-16 22:57:11 +02001222 def test_writelines(self):
1223 l = [b'ab', b'cd', b'ef']
1224 writer = self.MockRawIO()
1225 bufio = self.tp(writer, 8)
1226 bufio.writelines(l)
1227 bufio.flush()
1228 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1229
1230 def test_writelines_userlist(self):
1231 l = UserList([b'ab', b'cd', b'ef'])
1232 writer = self.MockRawIO()
1233 bufio = self.tp(writer, 8)
1234 bufio.writelines(l)
1235 bufio.flush()
1236 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1237
1238 def test_writelines_error(self):
1239 writer = self.MockRawIO()
1240 bufio = self.tp(writer, 8)
1241 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1242 self.assertRaises(TypeError, bufio.writelines, None)
1243 self.assertRaises(TypeError, bufio.writelines, 'abc')
1244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 def test_destructor(self):
1246 writer = self.MockRawIO()
1247 bufio = self.tp(writer, 8)
1248 bufio.write(b"abc")
1249 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001250 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001251 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252
1253 def test_truncate(self):
1254 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001255 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001256 bufio = self.tp(raw, 8)
1257 bufio.write(b"abcdef")
1258 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001259 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001260 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261 self.assertEqual(f.read(), b"abc")
1262
Victor Stinner45df8202010-04-28 22:31:17 +00001263 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001264 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001266 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 # Write out many bytes from many threads and test they were
1268 # all flushed.
1269 N = 1000
1270 contents = bytes(range(256)) * N
1271 sizes = cycle([1, 19])
1272 n = 0
1273 queue = deque()
1274 while n < len(contents):
1275 size = next(sizes)
1276 queue.append(contents[n:n+size])
1277 n += size
1278 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001279 # We use a real file object because it allows us to
1280 # exercise situations where the GIL is released before
1281 # writing the buffer to the raw streams. This is in addition
1282 # to concurrency issues due to switching threads in the middle
1283 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001284 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001285 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001286 errors = []
1287 def f():
1288 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 while True:
1290 try:
1291 s = queue.popleft()
1292 except IndexError:
1293 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001294 bufio.write(s)
1295 except Exception as e:
1296 errors.append(e)
1297 raise
1298 threads = [threading.Thread(target=f) for x in range(20)]
1299 for t in threads:
1300 t.start()
1301 time.sleep(0.02) # yield
1302 for t in threads:
1303 t.join()
1304 self.assertFalse(errors,
1305 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001307 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 s = f.read()
1309 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001310 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001311 finally:
1312 support.unlink(support.TESTFN)
1313
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314 def test_misbehaved_io(self):
1315 rawio = self.MisbehavedRawIO()
1316 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001317 self.assertRaises(OSError, bufio.seek, 0)
1318 self.assertRaises(OSError, bufio.tell)
1319 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320
Florent Xicluna109d5732012-07-07 17:03:22 +02001321 def test_max_buffer_size_removal(self):
1322 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001323 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001324
Benjamin Peterson68623612012-12-20 11:53:11 -06001325 def test_write_error_on_close(self):
1326 raw = self.MockRawIO()
1327 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001328 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001329 raw.write = bad_write
1330 b = self.tp(raw)
1331 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001332 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001333 self.assertTrue(b.closed)
1334
Benjamin Peterson59406a92009-03-26 17:10:29 +00001335
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001336class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337 tp = io.BufferedWriter
1338
1339 def test_constructor(self):
1340 BufferedWriterTest.test_constructor(self)
1341 # The allocation can succeed on 32-bit builds, e.g. with more
1342 # than 2GB RAM and a 64-bit kernel.
1343 if sys.maxsize > 0x7FFFFFFF:
1344 rawio = self.MockRawIO()
1345 bufio = self.tp(rawio)
1346 self.assertRaises((OverflowError, MemoryError, ValueError),
1347 bufio.__init__, rawio, sys.maxsize)
1348
1349 def test_initialization(self):
1350 rawio = self.MockRawIO()
1351 bufio = self.tp(rawio)
1352 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1353 self.assertRaises(ValueError, bufio.write, b"def")
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358
1359 def test_garbage_collection(self):
1360 # C BufferedWriter objects are collected, and collecting them flushes
1361 # all data to disk.
1362 # The Python version has __del__, so it ends into gc.garbage instead
1363 rawio = self.FileIO(support.TESTFN, "w+b")
1364 f = self.tp(rawio)
1365 f.write(b"123xxx")
1366 f.x = f
1367 wr = weakref.ref(f)
1368 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001369 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001370 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001371 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 self.assertEqual(f.read(), b"123xxx")
1373
1374
1375class PyBufferedWriterTest(BufferedWriterTest):
1376 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001377
Guido van Rossum01a27522007-03-07 01:00:12 +00001378class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001379
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001380 def test_constructor(self):
1381 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001382 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001383
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001384 def test_detach(self):
1385 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1386 self.assertRaises(self.UnsupportedOperation, pair.detach)
1387
Florent Xicluna109d5732012-07-07 17:03:22 +02001388 def test_constructor_max_buffer_size_removal(self):
1389 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001390 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001391
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001392 def test_constructor_with_not_readable(self):
1393 class NotReadable(MockRawIO):
1394 def readable(self):
1395 return False
1396
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001397 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001398
1399 def test_constructor_with_not_writeable(self):
1400 class NotWriteable(MockRawIO):
1401 def writable(self):
1402 return False
1403
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001404 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001405
1406 def test_read(self):
1407 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1408
1409 self.assertEqual(pair.read(3), b"abc")
1410 self.assertEqual(pair.read(1), b"d")
1411 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001412 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1413 self.assertEqual(pair.read(None), b"abc")
1414
1415 def test_readlines(self):
1416 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1417 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1418 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1419 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001420
1421 def test_read1(self):
1422 # .read1() is delegated to the underlying reader object, so this test
1423 # can be shallow.
1424 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1425
1426 self.assertEqual(pair.read1(3), b"abc")
1427
1428 def test_readinto(self):
1429 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1430
1431 data = bytearray(5)
1432 self.assertEqual(pair.readinto(data), 5)
1433 self.assertEqual(data, b"abcde")
1434
1435 def test_write(self):
1436 w = self.MockRawIO()
1437 pair = self.tp(self.MockRawIO(), w)
1438
1439 pair.write(b"abc")
1440 pair.flush()
1441 pair.write(b"def")
1442 pair.flush()
1443 self.assertEqual(w._write_stack, [b"abc", b"def"])
1444
1445 def test_peek(self):
1446 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1447
1448 self.assertTrue(pair.peek(3).startswith(b"abc"))
1449 self.assertEqual(pair.read(3), b"abc")
1450
1451 def test_readable(self):
1452 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1453 self.assertTrue(pair.readable())
1454
1455 def test_writeable(self):
1456 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1457 self.assertTrue(pair.writable())
1458
1459 def test_seekable(self):
1460 # BufferedRWPairs are never seekable, even if their readers and writers
1461 # are.
1462 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1463 self.assertFalse(pair.seekable())
1464
1465 # .flush() is delegated to the underlying writer object and has been
1466 # tested in the test_write method.
1467
1468 def test_close_and_closed(self):
1469 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1470 self.assertFalse(pair.closed)
1471 pair.close()
1472 self.assertTrue(pair.closed)
1473
1474 def test_isatty(self):
1475 class SelectableIsAtty(MockRawIO):
1476 def __init__(self, isatty):
1477 MockRawIO.__init__(self)
1478 self._isatty = isatty
1479
1480 def isatty(self):
1481 return self._isatty
1482
1483 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1484 self.assertFalse(pair.isatty())
1485
1486 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1487 self.assertTrue(pair.isatty())
1488
1489 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1490 self.assertTrue(pair.isatty())
1491
1492 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1493 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495class CBufferedRWPairTest(BufferedRWPairTest):
1496 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498class PyBufferedRWPairTest(BufferedRWPairTest):
1499 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001501
1502class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1503 read_mode = "rb+"
1504 write_mode = "wb+"
1505
1506 def test_constructor(self):
1507 BufferedReaderTest.test_constructor(self)
1508 BufferedWriterTest.test_constructor(self)
1509
1510 def test_read_and_write(self):
1511 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001512 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001513
1514 self.assertEqual(b"as", rw.read(2))
1515 rw.write(b"ddd")
1516 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001517 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001518 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001519 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001520
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 def test_seek_and_tell(self):
1522 raw = self.BytesIO(b"asdfghjkl")
1523 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001524
Ezio Melottib3aedd42010-11-20 19:04:17 +00001525 self.assertEqual(b"as", rw.read(2))
1526 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001527 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001529
Antoine Pitroue05565e2011-08-20 14:39:23 +02001530 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001531 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001532 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001533 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001534 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001535 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001536 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001537 self.assertEqual(7, rw.tell())
1538 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001539 rw.flush()
1540 self.assertEqual(b"asdf123fl", raw.getvalue())
1541
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001542 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 def check_flush_and_read(self, read_func):
1545 raw = self.BytesIO(b"abcdefghi")
1546 bufio = self.tp(raw)
1547
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(b"ef", read_func(bufio, 2))
1551 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001553 self.assertEqual(6, bufio.tell())
1554 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 raw.seek(0, 0)
1556 raw.write(b"XYZ")
1557 # flush() resets the read buffer
1558 bufio.flush()
1559 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001560 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001561
1562 def test_flush_and_read(self):
1563 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1564
1565 def test_flush_and_readinto(self):
1566 def _readinto(bufio, n=-1):
1567 b = bytearray(n if n >= 0 else 9999)
1568 n = bufio.readinto(b)
1569 return bytes(b[:n])
1570 self.check_flush_and_read(_readinto)
1571
1572 def test_flush_and_peek(self):
1573 def _peek(bufio, n=-1):
1574 # This relies on the fact that the buffer can contain the whole
1575 # raw stream, otherwise peek() can return less.
1576 b = bufio.peek(n)
1577 if n != -1:
1578 b = b[:n]
1579 bufio.seek(len(b), 1)
1580 return b
1581 self.check_flush_and_read(_peek)
1582
1583 def test_flush_and_write(self):
1584 raw = self.BytesIO(b"abcdefghi")
1585 bufio = self.tp(raw)
1586
1587 bufio.write(b"123")
1588 bufio.flush()
1589 bufio.write(b"45")
1590 bufio.flush()
1591 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001592 self.assertEqual(b"12345fghi", raw.getvalue())
1593 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594
1595 def test_threads(self):
1596 BufferedReaderTest.test_threads(self)
1597 BufferedWriterTest.test_threads(self)
1598
1599 def test_writes_and_peek(self):
1600 def _peek(bufio):
1601 bufio.peek(1)
1602 self.check_writes(_peek)
1603 def _peek(bufio):
1604 pos = bufio.tell()
1605 bufio.seek(-1, 1)
1606 bufio.peek(1)
1607 bufio.seek(pos, 0)
1608 self.check_writes(_peek)
1609
1610 def test_writes_and_reads(self):
1611 def _read(bufio):
1612 bufio.seek(-1, 1)
1613 bufio.read(1)
1614 self.check_writes(_read)
1615
1616 def test_writes_and_read1s(self):
1617 def _read1(bufio):
1618 bufio.seek(-1, 1)
1619 bufio.read1(1)
1620 self.check_writes(_read1)
1621
1622 def test_writes_and_readintos(self):
1623 def _read(bufio):
1624 bufio.seek(-1, 1)
1625 bufio.readinto(bytearray(1))
1626 self.check_writes(_read)
1627
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001628 def test_write_after_readahead(self):
1629 # Issue #6629: writing after the buffer was filled by readahead should
1630 # first rewind the raw stream.
1631 for overwrite_size in [1, 5]:
1632 raw = self.BytesIO(b"A" * 10)
1633 bufio = self.tp(raw, 4)
1634 # Trigger readahead
1635 self.assertEqual(bufio.read(1), b"A")
1636 self.assertEqual(bufio.tell(), 1)
1637 # Overwriting should rewind the raw stream if it needs so
1638 bufio.write(b"B" * overwrite_size)
1639 self.assertEqual(bufio.tell(), overwrite_size + 1)
1640 # If the write size was smaller than the buffer size, flush() and
1641 # check that rewind happens.
1642 bufio.flush()
1643 self.assertEqual(bufio.tell(), overwrite_size + 1)
1644 s = raw.getvalue()
1645 self.assertEqual(s,
1646 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1647
Antoine Pitrou7c404892011-05-13 00:13:33 +02001648 def test_write_rewind_write(self):
1649 # Various combinations of reading / writing / seeking backwards / writing again
1650 def mutate(bufio, pos1, pos2):
1651 assert pos2 >= pos1
1652 # Fill the buffer
1653 bufio.seek(pos1)
1654 bufio.read(pos2 - pos1)
1655 bufio.write(b'\x02')
1656 # This writes earlier than the previous write, but still inside
1657 # the buffer.
1658 bufio.seek(pos1)
1659 bufio.write(b'\x01')
1660
1661 b = b"\x80\x81\x82\x83\x84"
1662 for i in range(0, len(b)):
1663 for j in range(i, len(b)):
1664 raw = self.BytesIO(b)
1665 bufio = self.tp(raw, 100)
1666 mutate(bufio, i, j)
1667 bufio.flush()
1668 expected = bytearray(b)
1669 expected[j] = 2
1670 expected[i] = 1
1671 self.assertEqual(raw.getvalue(), expected,
1672 "failed result for i=%d, j=%d" % (i, j))
1673
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001674 def test_truncate_after_read_or_write(self):
1675 raw = self.BytesIO(b"A" * 10)
1676 bufio = self.tp(raw, 100)
1677 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1678 self.assertEqual(bufio.truncate(), 2)
1679 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1680 self.assertEqual(bufio.truncate(), 4)
1681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 def test_misbehaved_io(self):
1683 BufferedReaderTest.test_misbehaved_io(self)
1684 BufferedWriterTest.test_misbehaved_io(self)
1685
Antoine Pitroue05565e2011-08-20 14:39:23 +02001686 def test_interleaved_read_write(self):
1687 # Test for issue #12213
1688 with self.BytesIO(b'abcdefgh') as raw:
1689 with self.tp(raw, 100) as f:
1690 f.write(b"1")
1691 self.assertEqual(f.read(1), b'b')
1692 f.write(b'2')
1693 self.assertEqual(f.read1(1), b'd')
1694 f.write(b'3')
1695 buf = bytearray(1)
1696 f.readinto(buf)
1697 self.assertEqual(buf, b'f')
1698 f.write(b'4')
1699 self.assertEqual(f.peek(1), b'h')
1700 f.flush()
1701 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1702
1703 with self.BytesIO(b'abc') as raw:
1704 with self.tp(raw, 100) as f:
1705 self.assertEqual(f.read(1), b'a')
1706 f.write(b"2")
1707 self.assertEqual(f.read(1), b'c')
1708 f.flush()
1709 self.assertEqual(raw.getvalue(), b'a2c')
1710
1711 def test_interleaved_readline_write(self):
1712 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1713 with self.tp(raw) as f:
1714 f.write(b'1')
1715 self.assertEqual(f.readline(), b'b\n')
1716 f.write(b'2')
1717 self.assertEqual(f.readline(), b'def\n')
1718 f.write(b'3')
1719 self.assertEqual(f.readline(), b'\n')
1720 f.flush()
1721 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1722
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001723 # You can't construct a BufferedRandom over a non-seekable stream.
1724 test_unseekable = None
1725
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001726class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 tp = io.BufferedRandom
1728
1729 def test_constructor(self):
1730 BufferedRandomTest.test_constructor(self)
1731 # The allocation can succeed on 32-bit builds, e.g. with more
1732 # than 2GB RAM and a 64-bit kernel.
1733 if sys.maxsize > 0x7FFFFFFF:
1734 rawio = self.MockRawIO()
1735 bufio = self.tp(rawio)
1736 self.assertRaises((OverflowError, MemoryError, ValueError),
1737 bufio.__init__, rawio, sys.maxsize)
1738
1739 def test_garbage_collection(self):
1740 CBufferedReaderTest.test_garbage_collection(self)
1741 CBufferedWriterTest.test_garbage_collection(self)
1742
1743class PyBufferedRandomTest(BufferedRandomTest):
1744 tp = pyio.BufferedRandom
1745
1746
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001747# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1748# properties:
1749# - A single output character can correspond to many bytes of input.
1750# - The number of input bytes to complete the character can be
1751# undetermined until the last input byte is received.
1752# - The number of input bytes can vary depending on previous input.
1753# - A single input byte can correspond to many characters of output.
1754# - The number of output characters can be undetermined until the
1755# last input byte is received.
1756# - The number of output characters can vary depending on previous input.
1757
1758class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1759 """
1760 For testing seek/tell behavior with a stateful, buffering decoder.
1761
1762 Input is a sequence of words. Words may be fixed-length (length set
1763 by input) or variable-length (period-terminated). In variable-length
1764 mode, extra periods are ignored. Possible words are:
1765 - 'i' followed by a number sets the input length, I (maximum 99).
1766 When I is set to 0, words are space-terminated.
1767 - 'o' followed by a number sets the output length, O (maximum 99).
1768 - Any other word is converted into a word followed by a period on
1769 the output. The output word consists of the input word truncated
1770 or padded out with hyphens to make its length equal to O. If O
1771 is 0, the word is output verbatim without truncating or padding.
1772 I and O are initially set to 1. When I changes, any buffered input is
1773 re-scanned according to the new I. EOF also terminates the last word.
1774 """
1775
1776 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001777 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001778 self.reset()
1779
1780 def __repr__(self):
1781 return '<SID %x>' % id(self)
1782
1783 def reset(self):
1784 self.i = 1
1785 self.o = 1
1786 self.buffer = bytearray()
1787
1788 def getstate(self):
1789 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1790 return bytes(self.buffer), i*100 + o
1791
1792 def setstate(self, state):
1793 buffer, io = state
1794 self.buffer = bytearray(buffer)
1795 i, o = divmod(io, 100)
1796 self.i, self.o = i ^ 1, o ^ 1
1797
1798 def decode(self, input, final=False):
1799 output = ''
1800 for b in input:
1801 if self.i == 0: # variable-length, terminated with period
1802 if b == ord('.'):
1803 if self.buffer:
1804 output += self.process_word()
1805 else:
1806 self.buffer.append(b)
1807 else: # fixed-length, terminate after self.i bytes
1808 self.buffer.append(b)
1809 if len(self.buffer) == self.i:
1810 output += self.process_word()
1811 if final and self.buffer: # EOF terminates the last word
1812 output += self.process_word()
1813 return output
1814
1815 def process_word(self):
1816 output = ''
1817 if self.buffer[0] == ord('i'):
1818 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1819 elif self.buffer[0] == ord('o'):
1820 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1821 else:
1822 output = self.buffer.decode('ascii')
1823 if len(output) < self.o:
1824 output += '-'*self.o # pad out with hyphens
1825 if self.o:
1826 output = output[:self.o] # truncate to output length
1827 output += '.'
1828 self.buffer = bytearray()
1829 return output
1830
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001831 codecEnabled = False
1832
1833 @classmethod
1834 def lookupTestDecoder(cls, name):
1835 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001836 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001837 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001838 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001839 incrementalencoder=None,
1840 streamreader=None, streamwriter=None,
1841 incrementaldecoder=cls)
1842
1843# Register the previous decoder for testing.
1844# Disabled by default, tests will enable it.
1845codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1846
1847
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001848class StatefulIncrementalDecoderTest(unittest.TestCase):
1849 """
1850 Make sure the StatefulIncrementalDecoder actually works.
1851 """
1852
1853 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001854 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001855 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001856 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001857 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001858 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001859 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001860 # I=0, O=6 (variable-length input, fixed-length output)
1861 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1862 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001863 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001864 # I=6, O=3 (fixed-length input > fixed-length output)
1865 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1866 # I=0, then 3; O=29, then 15 (with longer output)
1867 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1868 'a----------------------------.' +
1869 'b----------------------------.' +
1870 'cde--------------------------.' +
1871 'abcdefghijabcde.' +
1872 'a.b------------.' +
1873 '.c.------------.' +
1874 'd.e------------.' +
1875 'k--------------.' +
1876 'l--------------.' +
1877 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001878 ]
1879
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001881 # Try a few one-shot test cases.
1882 for input, eof, output in self.test_cases:
1883 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001884 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001885
1886 # Also test an unfinished decode, followed by forcing EOF.
1887 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001888 self.assertEqual(d.decode(b'oiabcd'), '')
1889 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001890
1891class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001892
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001893 def setUp(self):
1894 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1895 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001896 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001897
Guido van Rossumd0712812007-04-11 16:32:43 +00001898 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001899 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001901 def test_constructor(self):
1902 r = self.BytesIO(b"\xc3\xa9\n\n")
1903 b = self.BufferedReader(r, 1000)
1904 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001905 t.__init__(b, encoding="latin-1", newline="\r\n")
1906 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001907 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001908 t.__init__(b, encoding="utf-8", line_buffering=True)
1909 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001910 self.assertEqual(t.line_buffering, True)
1911 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 self.assertRaises(TypeError, t.__init__, b, newline=42)
1913 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1914
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001915 def test_detach(self):
1916 r = self.BytesIO()
1917 b = self.BufferedWriter(r)
1918 t = self.TextIOWrapper(b)
1919 self.assertIs(t.detach(), b)
1920
1921 t = self.TextIOWrapper(b, encoding="ascii")
1922 t.write("howdy")
1923 self.assertFalse(r.getvalue())
1924 t.detach()
1925 self.assertEqual(r.getvalue(), b"howdy")
1926 self.assertRaises(ValueError, t.detach)
1927
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001928 def test_repr(self):
1929 raw = self.BytesIO("hello".encode("utf-8"))
1930 b = self.BufferedReader(raw)
1931 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001932 modname = self.TextIOWrapper.__module__
1933 self.assertEqual(repr(t),
1934 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1935 raw.name = "dummy"
1936 self.assertEqual(repr(t),
1937 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001938 t.mode = "r"
1939 self.assertEqual(repr(t),
1940 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001941 raw.name = b"dummy"
1942 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001943 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 def test_line_buffering(self):
1946 r = self.BytesIO()
1947 b = self.BufferedWriter(r, 1000)
1948 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001949 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001951 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001952 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001953 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001954 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001955
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001956 def test_default_encoding(self):
1957 old_environ = dict(os.environ)
1958 try:
1959 # try to get a user preferred encoding different than the current
1960 # locale encoding to check that TextIOWrapper() uses the current
1961 # locale encoding and not the user preferred encoding
1962 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1963 if key in os.environ:
1964 del os.environ[key]
1965
1966 current_locale_encoding = locale.getpreferredencoding(False)
1967 b = self.BytesIO()
1968 t = self.TextIOWrapper(b)
1969 self.assertEqual(t.encoding, current_locale_encoding)
1970 finally:
1971 os.environ.clear()
1972 os.environ.update(old_environ)
1973
Serhiy Storchaka78980432013-01-15 01:12:17 +02001974 # Issue 15989
1975 def test_device_encoding(self):
1976 b = self.BytesIO()
1977 b.fileno = lambda: _testcapi.INT_MAX + 1
1978 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1979 b.fileno = lambda: _testcapi.UINT_MAX + 1
1980 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 def test_encoding(self):
1983 # Check the encoding attribute is always set, and valid
1984 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001985 t = self.TextIOWrapper(b, encoding="utf-8")
1986 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001988 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 codecs.lookup(t.encoding)
1990
1991 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001992 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 b = self.BytesIO(b"abc\n\xff\n")
1994 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001995 self.assertRaises(UnicodeError, t.read)
1996 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 b = self.BytesIO(b"abc\n\xff\n")
1998 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001999 self.assertRaises(UnicodeError, t.read)
2000 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 b = self.BytesIO(b"abc\n\xff\n")
2002 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002003 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002004 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 b = self.BytesIO(b"abc\n\xff\n")
2006 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002010 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO()
2012 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002013 self.assertRaises(UnicodeError, t.write, "\xff")
2014 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 b = self.BytesIO()
2016 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002017 self.assertRaises(UnicodeError, t.write, "\xff")
2018 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 b = self.BytesIO()
2020 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002021 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002022 t.write("abc\xffdef\n")
2023 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002024 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002025 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 b = self.BytesIO()
2027 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002028 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002029 t.write("abc\xffdef\n")
2030 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002034 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2035
2036 tests = [
2037 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002038 [ '', input_lines ],
2039 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2040 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2041 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002042 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002043 encodings = (
2044 'utf-8', 'latin-1',
2045 'utf-16', 'utf-16-le', 'utf-16-be',
2046 'utf-32', 'utf-32-le', 'utf-32-be',
2047 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002048
Guido van Rossum8358db22007-08-18 21:39:55 +00002049 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002050 # character in TextIOWrapper._pending_line.
2051 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002052 # XXX: str.encode() should return bytes
2053 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002054 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002055 for bufsize in range(1, 10):
2056 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002057 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2058 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002059 encoding=encoding)
2060 if do_reads:
2061 got_lines = []
2062 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002063 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002064 if c2 == '':
2065 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002066 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002067 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002068 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002069 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002070
2071 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(got_line, exp_line)
2073 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 def test_newlines_input(self):
2076 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002077 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2078 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002079 (None, normalized.decode("ascii").splitlines(keepends=True)),
2080 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2082 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2083 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002084 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 buf = self.BytesIO(testdata)
2086 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002087 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002088 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 def test_newlines_output(self):
2092 testdict = {
2093 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2094 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2095 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2096 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2097 }
2098 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2099 for newline, expected in tests:
2100 buf = self.BytesIO()
2101 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2102 txt.write("AAA\nB")
2103 txt.write("BB\nCCC\n")
2104 txt.write("X\rY\r\nZ")
2105 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(buf.closed, False)
2107 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108
2109 def test_destructor(self):
2110 l = []
2111 base = self.BytesIO
2112 class MyBytesIO(base):
2113 def close(self):
2114 l.append(self.getvalue())
2115 base.close(self)
2116 b = MyBytesIO()
2117 t = self.TextIOWrapper(b, encoding="ascii")
2118 t.write("abc")
2119 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002120 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002121 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122
2123 def test_override_destructor(self):
2124 record = []
2125 class MyTextIO(self.TextIOWrapper):
2126 def __del__(self):
2127 record.append(1)
2128 try:
2129 f = super().__del__
2130 except AttributeError:
2131 pass
2132 else:
2133 f()
2134 def close(self):
2135 record.append(2)
2136 super().close()
2137 def flush(self):
2138 record.append(3)
2139 super().flush()
2140 b = self.BytesIO()
2141 t = MyTextIO(b, encoding="ascii")
2142 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002143 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 self.assertEqual(record, [1, 2, 3])
2145
2146 def test_error_through_destructor(self):
2147 # Test that the exception state is not modified by a destructor,
2148 # even if close() fails.
2149 rawio = self.CloseFailureIO()
2150 def f():
2151 self.TextIOWrapper(rawio).xyzzy
2152 with support.captured_output("stderr") as s:
2153 self.assertRaises(AttributeError, f)
2154 s = s.getvalue().strip()
2155 if s:
2156 # The destructor *may* have printed an unraisable error, check it
2157 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002158 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002159 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002160
Guido van Rossum9b76da62007-04-11 01:09:03 +00002161 # Systematic tests of the text I/O API
2162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002164 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002165 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002167 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002169 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002171 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(f.tell(), 0)
2173 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002174 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(f.seek(0), 0)
2176 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002177 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002178 self.assertEqual(f.read(2), "ab")
2179 self.assertEqual(f.read(1), "c")
2180 self.assertEqual(f.read(1), "")
2181 self.assertEqual(f.read(), "")
2182 self.assertEqual(f.tell(), cookie)
2183 self.assertEqual(f.seek(0), 0)
2184 self.assertEqual(f.seek(0, 2), cookie)
2185 self.assertEqual(f.write("def"), 3)
2186 self.assertEqual(f.seek(cookie), cookie)
2187 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002188 if enc.startswith("utf"):
2189 self.multi_line_test(f, enc)
2190 f.close()
2191
2192 def multi_line_test(self, f, enc):
2193 f.seek(0)
2194 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002195 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002196 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002197 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002198 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002199 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002200 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002201 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002202 wlines.append((f.tell(), line))
2203 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002204 f.seek(0)
2205 rlines = []
2206 while True:
2207 pos = f.tell()
2208 line = f.readline()
2209 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002210 break
2211 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002214 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002215 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002216 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002217 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002218 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002219 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002220 p2 = f.tell()
2221 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002222 self.assertEqual(f.tell(), p0)
2223 self.assertEqual(f.readline(), "\xff\n")
2224 self.assertEqual(f.tell(), p1)
2225 self.assertEqual(f.readline(), "\xff\n")
2226 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002227 f.seek(0)
2228 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002230 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002232 f.close()
2233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 def test_seeking(self):
2235 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002236 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002237 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002238 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002240 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002241 suffix = bytes(u_suffix.encode("utf-8"))
2242 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002243 with self.open(support.TESTFN, "wb") as f:
2244 f.write(line*2)
2245 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2246 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(s, str(prefix, "ascii"))
2248 self.assertEqual(f.tell(), prefix_size)
2249 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002252 # Regression test for a specific bug
2253 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002254 with self.open(support.TESTFN, "wb") as f:
2255 f.write(data)
2256 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2257 f._CHUNK_SIZE # Just test that it exists
2258 f._CHUNK_SIZE = 2
2259 f.readline()
2260 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 def test_seek_and_tell(self):
2263 #Test seek/tell using the StatefulIncrementalDecoder.
2264 # Make test faster by doing smaller seeks
2265 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002266
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002267 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268 """Tell/seek to various points within a data stream and ensure
2269 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002271 f.write(data)
2272 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 f = self.open(support.TESTFN, encoding='test_decoder')
2274 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002275 decoded = f.read()
2276 f.close()
2277
Neal Norwitze2b07052008-03-18 19:52:05 +00002278 for i in range(min_pos, len(decoded) + 1): # seek positions
2279 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002281 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002282 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002283 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002284 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002285 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002286 f.close()
2287
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002288 # Enable the test decoder.
2289 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002290
2291 # Run the tests.
2292 try:
2293 # Try each test case.
2294 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002295 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002296
2297 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002298 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2299 offset = CHUNK_SIZE - len(input)//2
2300 prefix = b'.'*offset
2301 # Don't bother seeking into the prefix (takes too long).
2302 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002303 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002304
2305 # Ensure our test decoder won't interfere with subsequent tests.
2306 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002307 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002308
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002309 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002310 data = "1234567890"
2311 tests = ("utf-16",
2312 "utf-16-le",
2313 "utf-16-be",
2314 "utf-32",
2315 "utf-32-le",
2316 "utf-32-be")
2317 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002318 buf = self.BytesIO()
2319 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002320 # Check if the BOM is written only once (see issue1753).
2321 f.write(data)
2322 f.write(data)
2323 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002324 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002325 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(f.read(), data * 2)
2327 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002328
Benjamin Petersona1b49012009-03-31 23:11:32 +00002329 def test_unreadable(self):
2330 class UnReadable(self.BytesIO):
2331 def readable(self):
2332 return False
2333 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002334 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 def test_read_one_by_one(self):
2337 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002338 reads = ""
2339 while True:
2340 c = txt.read(1)
2341 if not c:
2342 break
2343 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002344 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002345
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002346 def test_readlines(self):
2347 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2348 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2349 txt.seek(0)
2350 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2351 txt.seek(0)
2352 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2353
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002354 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002355 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002356 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002357 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002358 reads = ""
2359 while True:
2360 c = txt.read(128)
2361 if not c:
2362 break
2363 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002364 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002365
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002366 def test_writelines(self):
2367 l = ['ab', 'cd', 'ef']
2368 buf = self.BytesIO()
2369 txt = self.TextIOWrapper(buf)
2370 txt.writelines(l)
2371 txt.flush()
2372 self.assertEqual(buf.getvalue(), b'abcdef')
2373
2374 def test_writelines_userlist(self):
2375 l = UserList(['ab', 'cd', 'ef'])
2376 buf = self.BytesIO()
2377 txt = self.TextIOWrapper(buf)
2378 txt.writelines(l)
2379 txt.flush()
2380 self.assertEqual(buf.getvalue(), b'abcdef')
2381
2382 def test_writelines_error(self):
2383 txt = self.TextIOWrapper(self.BytesIO())
2384 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2385 self.assertRaises(TypeError, txt.writelines, None)
2386 self.assertRaises(TypeError, txt.writelines, b'abc')
2387
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002388 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002390
2391 # read one char at a time
2392 reads = ""
2393 while True:
2394 c = txt.read(1)
2395 if not c:
2396 break
2397 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002398 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002399
2400 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002402 txt._CHUNK_SIZE = 4
2403
2404 reads = ""
2405 while True:
2406 c = txt.read(4)
2407 if not c:
2408 break
2409 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002410 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002411
2412 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002414 txt._CHUNK_SIZE = 4
2415
2416 reads = txt.read(4)
2417 reads += txt.read(4)
2418 reads += txt.readline()
2419 reads += txt.readline()
2420 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002421 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002422
2423 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425 txt._CHUNK_SIZE = 4
2426
2427 reads = txt.read(4)
2428 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002430
2431 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002433 txt._CHUNK_SIZE = 4
2434
2435 reads = txt.read(4)
2436 pos = txt.tell()
2437 txt.seek(0)
2438 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002440
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002441 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 buffer = self.BytesIO(self.testdata)
2443 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002444
2445 self.assertEqual(buffer.seekable(), txt.seekable())
2446
Antoine Pitroue4501852009-05-14 18:55:55 +00002447 def test_append_bom(self):
2448 # The BOM is not written again when appending to a non-empty file
2449 filename = support.TESTFN
2450 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2451 with self.open(filename, 'w', encoding=charset) as f:
2452 f.write('aaa')
2453 pos = f.tell()
2454 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002455 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002456
2457 with self.open(filename, 'a', encoding=charset) as f:
2458 f.write('xxx')
2459 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002460 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002461
2462 def test_seek_bom(self):
2463 # Same test, but when seeking manually
2464 filename = support.TESTFN
2465 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2466 with self.open(filename, 'w', encoding=charset) as f:
2467 f.write('aaa')
2468 pos = f.tell()
2469 with self.open(filename, 'r+', encoding=charset) as f:
2470 f.seek(pos)
2471 f.write('zzz')
2472 f.seek(0)
2473 f.write('bbb')
2474 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002476
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002477 def test_errors_property(self):
2478 with self.open(support.TESTFN, "w") as f:
2479 self.assertEqual(f.errors, "strict")
2480 with self.open(support.TESTFN, "w", errors="replace") as f:
2481 self.assertEqual(f.errors, "replace")
2482
Brett Cannon31f59292011-02-21 19:29:56 +00002483 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002484 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002485 def test_threads_write(self):
2486 # Issue6750: concurrent writes could duplicate data
2487 event = threading.Event()
2488 with self.open(support.TESTFN, "w", buffering=1) as f:
2489 def run(n):
2490 text = "Thread%03d\n" % n
2491 event.wait()
2492 f.write(text)
2493 threads = [threading.Thread(target=lambda n=x: run(n))
2494 for x in range(20)]
2495 for t in threads:
2496 t.start()
2497 time.sleep(0.02)
2498 event.set()
2499 for t in threads:
2500 t.join()
2501 with self.open(support.TESTFN) as f:
2502 content = f.read()
2503 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002505
Antoine Pitrou6be88762010-05-03 16:48:20 +00002506 def test_flush_error_on_close(self):
2507 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2508 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002509 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002510 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002511 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002512 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002513
2514 def test_multi_close(self):
2515 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2516 txt.close()
2517 txt.close()
2518 txt.close()
2519 self.assertRaises(ValueError, txt.flush)
2520
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002521 def test_unseekable(self):
2522 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2523 self.assertRaises(self.UnsupportedOperation, txt.tell)
2524 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2525
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002526 def test_readonly_attributes(self):
2527 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2528 buf = self.BytesIO(self.testdata)
2529 with self.assertRaises(AttributeError):
2530 txt.buffer = buf
2531
Antoine Pitroue96ec682011-07-23 21:46:35 +02002532 def test_rawio(self):
2533 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2534 # that subprocess.Popen() can have the required unbuffered
2535 # semantics with universal_newlines=True.
2536 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2537 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2538 # Reads
2539 self.assertEqual(txt.read(4), 'abcd')
2540 self.assertEqual(txt.readline(), 'efghi\n')
2541 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2542
2543 def test_rawio_write_through(self):
2544 # Issue #12591: with write_through=True, writes don't need a flush
2545 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2546 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2547 write_through=True)
2548 txt.write('1')
2549 txt.write('23\n4')
2550 txt.write('5')
2551 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002553class CTextIOWrapperTest(TextIOWrapperTest):
2554
2555 def test_initialization(self):
2556 r = self.BytesIO(b"\xc3\xa9\n\n")
2557 b = self.BufferedReader(r, 1000)
2558 t = self.TextIOWrapper(b)
2559 self.assertRaises(TypeError, t.__init__, b, newline=42)
2560 self.assertRaises(ValueError, t.read)
2561 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2562 self.assertRaises(ValueError, t.read)
2563
2564 def test_garbage_collection(self):
2565 # C TextIOWrapper objects are collected, and collecting them flushes
2566 # all data to disk.
2567 # The Python version has __del__, so it ends in gc.garbage instead.
2568 rawio = io.FileIO(support.TESTFN, "wb")
2569 b = self.BufferedWriter(rawio)
2570 t = self.TextIOWrapper(b, encoding="ascii")
2571 t.write("456def")
2572 t.x = t
2573 wr = weakref.ref(t)
2574 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 self.assertEqual(f.read(), b"456def")
2579
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002580 def test_rwpair_cleared_before_textio(self):
2581 # Issue 13070: TextIOWrapper's finalization would crash when called
2582 # after the reference to the underlying BufferedRWPair's writer got
2583 # cleared by the GC.
2584 for i in range(1000):
2585 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2586 t1 = self.TextIOWrapper(b1, encoding="ascii")
2587 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2588 t2 = self.TextIOWrapper(b2, encoding="ascii")
2589 # circular references
2590 t1.buddy = t2
2591 t2.buddy = t1
2592 support.gc_collect()
2593
2594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595class PyTextIOWrapperTest(TextIOWrapperTest):
2596 pass
2597
2598
2599class IncrementalNewlineDecoderTest(unittest.TestCase):
2600
2601 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002602 # UTF-8 specific tests for a newline decoder
2603 def _check_decode(b, s, **kwargs):
2604 # We exercise getstate() / setstate() as well as decode()
2605 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002606 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002607 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002608 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002609
Antoine Pitrou180a3362008-12-14 16:36:46 +00002610 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002611
Antoine Pitrou180a3362008-12-14 16:36:46 +00002612 _check_decode(b'\xe8', "")
2613 _check_decode(b'\xa2', "")
2614 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002615
Antoine Pitrou180a3362008-12-14 16:36:46 +00002616 _check_decode(b'\xe8', "")
2617 _check_decode(b'\xa2', "")
2618 _check_decode(b'\x88', "\u8888")
2619
2620 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002621 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2622
Antoine Pitrou180a3362008-12-14 16:36:46 +00002623 decoder.reset()
2624 _check_decode(b'\n', "\n")
2625 _check_decode(b'\r', "")
2626 _check_decode(b'', "\n", final=True)
2627 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628
Antoine Pitrou180a3362008-12-14 16:36:46 +00002629 _check_decode(b'\r', "")
2630 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002631
Antoine Pitrou180a3362008-12-14 16:36:46 +00002632 _check_decode(b'\r\r\n', "\n\n")
2633 _check_decode(b'\r', "")
2634 _check_decode(b'\r', "\n")
2635 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002636
Antoine Pitrou180a3362008-12-14 16:36:46 +00002637 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2638 _check_decode(b'\xe8\xa2\x88', "\u8888")
2639 _check_decode(b'\n', "\n")
2640 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2641 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002643 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002644 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 if encoding is not None:
2646 encoder = codecs.getincrementalencoder(encoding)()
2647 def _decode_bytewise(s):
2648 # Decode one byte at a time
2649 for b in encoder.encode(s):
2650 result.append(decoder.decode(bytes([b])))
2651 else:
2652 encoder = None
2653 def _decode_bytewise(s):
2654 # Decode one char at a time
2655 for c in s:
2656 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002657 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002658 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002659 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002660 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002661 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002662 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002663 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002664 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002665 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002666 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002667 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002668 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 input = "abc"
2670 if encoder is not None:
2671 encoder.reset()
2672 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002673 self.assertEqual(decoder.decode(input), "abc")
2674 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002675
2676 def test_newline_decoder(self):
2677 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002678 # None meaning the IncrementalNewlineDecoder takes unicode input
2679 # rather than bytes input
2680 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002681 'utf-16', 'utf-16-le', 'utf-16-be',
2682 'utf-32', 'utf-32-le', 'utf-32-be',
2683 )
2684 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 decoder = enc and codecs.getincrementaldecoder(enc)()
2686 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2687 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002688 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002689 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2690 self.check_newline_decoding_utf8(decoder)
2691
Antoine Pitrou66913e22009-03-06 23:40:56 +00002692 def test_newline_bytes(self):
2693 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2694 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(dec.newlines, None)
2696 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2697 self.assertEqual(dec.newlines, None)
2698 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2699 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002700 dec = self.IncrementalNewlineDecoder(None, translate=False)
2701 _check(dec)
2702 dec = self.IncrementalNewlineDecoder(None, translate=True)
2703 _check(dec)
2704
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2706 pass
2707
2708class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2709 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002710
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002711
Guido van Rossum01a27522007-03-07 01:00:12 +00002712# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002713
Guido van Rossum5abbf752007-08-27 17:39:33 +00002714class MiscIOTest(unittest.TestCase):
2715
Barry Warsaw40e82462008-11-20 20:14:50 +00002716 def tearDown(self):
2717 support.unlink(support.TESTFN)
2718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719 def test___all__(self):
2720 for name in self.io.__all__:
2721 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002722 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002723 if name == "open":
2724 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002725 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002726 self.assertTrue(issubclass(obj, Exception), name)
2727 elif not name.startswith("SEEK_"):
2728 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002729
Barry Warsaw40e82462008-11-20 20:14:50 +00002730 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002732 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002733 f.close()
2734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual(f.name, support.TESTFN)
2737 self.assertEqual(f.buffer.name, support.TESTFN)
2738 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2739 self.assertEqual(f.mode, "U")
2740 self.assertEqual(f.buffer.mode, "rb")
2741 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002742 f.close()
2743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002745 self.assertEqual(f.mode, "w+")
2746 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2747 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002750 self.assertEqual(g.mode, "wb")
2751 self.assertEqual(g.raw.mode, "wb")
2752 self.assertEqual(g.name, f.fileno())
2753 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002754 f.close()
2755 g.close()
2756
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002757 def test_io_after_close(self):
2758 for kwargs in [
2759 {"mode": "w"},
2760 {"mode": "wb"},
2761 {"mode": "w", "buffering": 1},
2762 {"mode": "w", "buffering": 2},
2763 {"mode": "wb", "buffering": 0},
2764 {"mode": "r"},
2765 {"mode": "rb"},
2766 {"mode": "r", "buffering": 1},
2767 {"mode": "r", "buffering": 2},
2768 {"mode": "rb", "buffering": 0},
2769 {"mode": "w+"},
2770 {"mode": "w+b"},
2771 {"mode": "w+", "buffering": 1},
2772 {"mode": "w+", "buffering": 2},
2773 {"mode": "w+b", "buffering": 0},
2774 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002776 f.close()
2777 self.assertRaises(ValueError, f.flush)
2778 self.assertRaises(ValueError, f.fileno)
2779 self.assertRaises(ValueError, f.isatty)
2780 self.assertRaises(ValueError, f.__iter__)
2781 if hasattr(f, "peek"):
2782 self.assertRaises(ValueError, f.peek, 1)
2783 self.assertRaises(ValueError, f.read)
2784 if hasattr(f, "read1"):
2785 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002786 if hasattr(f, "readall"):
2787 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002788 if hasattr(f, "readinto"):
2789 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2790 self.assertRaises(ValueError, f.readline)
2791 self.assertRaises(ValueError, f.readlines)
2792 self.assertRaises(ValueError, f.seek, 0)
2793 self.assertRaises(ValueError, f.tell)
2794 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002795 self.assertRaises(ValueError, f.write,
2796 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002797 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002798 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002800 def test_blockingioerror(self):
2801 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802 class C(str):
2803 pass
2804 c = C("")
2805 b = self.BlockingIOError(1, c)
2806 c.b = b
2807 b.c = c
2808 wr = weakref.ref(c)
2809 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002810 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002811 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002812
2813 def test_abcs(self):
2814 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002815 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2816 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2817 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2818 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002819
2820 def _check_abc_inheritance(self, abcmodule):
2821 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002822 self.assertIsInstance(f, abcmodule.IOBase)
2823 self.assertIsInstance(f, abcmodule.RawIOBase)
2824 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2825 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002826 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002827 self.assertIsInstance(f, abcmodule.IOBase)
2828 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2829 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2830 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002832 self.assertIsInstance(f, abcmodule.IOBase)
2833 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2834 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2835 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002836
2837 def test_abc_inheritance(self):
2838 # Test implementations inherit from their respective ABCs
2839 self._check_abc_inheritance(self)
2840
2841 def test_abc_inheritance_official(self):
2842 # Test implementations inherit from the official ABCs of the
2843 # baseline "io" module.
2844 self._check_abc_inheritance(io)
2845
Antoine Pitroue033e062010-10-29 10:38:18 +00002846 def _check_warn_on_dealloc(self, *args, **kwargs):
2847 f = open(*args, **kwargs)
2848 r = repr(f)
2849 with self.assertWarns(ResourceWarning) as cm:
2850 f = None
2851 support.gc_collect()
2852 self.assertIn(r, str(cm.warning.args[0]))
2853
2854 def test_warn_on_dealloc(self):
2855 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2856 self._check_warn_on_dealloc(support.TESTFN, "wb")
2857 self._check_warn_on_dealloc(support.TESTFN, "w")
2858
2859 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2860 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002861 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002862 for fd in fds:
2863 try:
2864 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002865 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002866 if e.errno != errno.EBADF:
2867 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002868 self.addCleanup(cleanup_fds)
2869 r, w = os.pipe()
2870 fds += r, w
2871 self._check_warn_on_dealloc(r, *args, **kwargs)
2872 # When using closefd=False, there's no warning
2873 r, w = os.pipe()
2874 fds += r, w
2875 with warnings.catch_warnings(record=True) as recorded:
2876 open(r, *args, closefd=False, **kwargs)
2877 support.gc_collect()
2878 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002879
2880 def test_warn_on_dealloc_fd(self):
2881 self._check_warn_on_dealloc_fd("rb", buffering=0)
2882 self._check_warn_on_dealloc_fd("rb")
2883 self._check_warn_on_dealloc_fd("r")
2884
2885
Antoine Pitrou243757e2010-11-05 21:15:39 +00002886 def test_pickling(self):
2887 # Pickling file objects is forbidden
2888 for kwargs in [
2889 {"mode": "w"},
2890 {"mode": "wb"},
2891 {"mode": "wb", "buffering": 0},
2892 {"mode": "r"},
2893 {"mode": "rb"},
2894 {"mode": "rb", "buffering": 0},
2895 {"mode": "w+"},
2896 {"mode": "w+b"},
2897 {"mode": "w+b", "buffering": 0},
2898 ]:
2899 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2900 with self.open(support.TESTFN, **kwargs) as f:
2901 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2902
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002903 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2904 def test_nonblock_pipe_write_bigbuf(self):
2905 self._test_nonblock_pipe_write(16*1024)
2906
2907 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2908 def test_nonblock_pipe_write_smallbuf(self):
2909 self._test_nonblock_pipe_write(1024)
2910
2911 def _set_non_blocking(self, fd):
2912 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2913 self.assertNotEqual(flags, -1)
2914 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2915 self.assertEqual(res, 0)
2916
2917 def _test_nonblock_pipe_write(self, bufsize):
2918 sent = []
2919 received = []
2920 r, w = os.pipe()
2921 self._set_non_blocking(r)
2922 self._set_non_blocking(w)
2923
2924 # To exercise all code paths in the C implementation we need
2925 # to play with buffer sizes. For instance, if we choose a
2926 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2927 # then we will never get a partial write of the buffer.
2928 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2929 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2930
2931 with rf, wf:
2932 for N in 9999, 73, 7574:
2933 try:
2934 i = 0
2935 while True:
2936 msg = bytes([i % 26 + 97]) * N
2937 sent.append(msg)
2938 wf.write(msg)
2939 i += 1
2940
2941 except self.BlockingIOError as e:
2942 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002943 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002944 sent[-1] = sent[-1][:e.characters_written]
2945 received.append(rf.read())
2946 msg = b'BLOCKED'
2947 wf.write(msg)
2948 sent.append(msg)
2949
2950 while True:
2951 try:
2952 wf.flush()
2953 break
2954 except self.BlockingIOError as e:
2955 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002956 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002957 self.assertEqual(e.characters_written, 0)
2958 received.append(rf.read())
2959
2960 received += iter(rf.read, None)
2961
2962 sent, received = b''.join(sent), b''.join(received)
2963 self.assertTrue(sent == received)
2964 self.assertTrue(wf.closed)
2965 self.assertTrue(rf.closed)
2966
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002967 def test_create_fail(self):
2968 # 'x' mode fails if file is existing
2969 with self.open(support.TESTFN, 'w'):
2970 pass
2971 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2972
2973 def test_create_writes(self):
2974 # 'x' mode opens for writing
2975 with self.open(support.TESTFN, 'xb') as f:
2976 f.write(b"spam")
2977 with self.open(support.TESTFN, 'rb') as f:
2978 self.assertEqual(b"spam", f.read())
2979
Christian Heimes7b648752012-09-10 14:48:43 +02002980 def test_open_allargs(self):
2981 # there used to be a buffer overflow in the parser for rawmode
2982 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2983
2984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002985class CMiscIOTest(MiscIOTest):
2986 io = io
2987
2988class PyMiscIOTest(MiscIOTest):
2989 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002990
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002991
2992@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2993class SignalsTest(unittest.TestCase):
2994
2995 def setUp(self):
2996 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2997
2998 def tearDown(self):
2999 signal.signal(signal.SIGALRM, self.oldalrm)
3000
3001 def alarm_interrupt(self, sig, frame):
3002 1/0
3003
3004 @unittest.skipUnless(threading, 'Threading required for this test.')
3005 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3006 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003007 invokes the signal handler, and bubbles up the exception raised
3008 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003009 read_results = []
3010 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003011 if hasattr(signal, 'pthread_sigmask'):
3012 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003013 s = os.read(r, 1)
3014 read_results.append(s)
3015 t = threading.Thread(target=_read)
3016 t.daemon = True
3017 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003018 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003019 try:
3020 wio = self.io.open(w, **fdopen_kwargs)
3021 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003022 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003023 # Fill the pipe enough that the write will be blocking.
3024 # It will be interrupted by the timer armed above. Since the
3025 # other thread has read one byte, the low-level write will
3026 # return with a successful (partial) result rather than an EINTR.
3027 # The buffered IO layer must check for pending signal
3028 # handlers, which in this case will invoke alarm_interrupt().
3029 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003030 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003031 t.join()
3032 # We got one byte, get another one and check that it isn't a
3033 # repeat of the first one.
3034 read_results.append(os.read(r, 1))
3035 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3036 finally:
3037 os.close(w)
3038 os.close(r)
3039 # This is deliberate. If we didn't close the file descriptor
3040 # before closing wio, wio would try to flush its internal
3041 # buffer, and block again.
3042 try:
3043 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003044 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003045 if e.errno != errno.EBADF:
3046 raise
3047
3048 def test_interrupted_write_unbuffered(self):
3049 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3050
3051 def test_interrupted_write_buffered(self):
3052 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3053
3054 def test_interrupted_write_text(self):
3055 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3056
Brett Cannon31f59292011-02-21 19:29:56 +00003057 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003058 def check_reentrant_write(self, data, **fdopen_kwargs):
3059 def on_alarm(*args):
3060 # Will be called reentrantly from the same thread
3061 wio.write(data)
3062 1/0
3063 signal.signal(signal.SIGALRM, on_alarm)
3064 r, w = os.pipe()
3065 wio = self.io.open(w, **fdopen_kwargs)
3066 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003067 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003068 # Either the reentrant call to wio.write() fails with RuntimeError,
3069 # or the signal handler raises ZeroDivisionError.
3070 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3071 while 1:
3072 for i in range(100):
3073 wio.write(data)
3074 wio.flush()
3075 # Make sure the buffer doesn't fill up and block further writes
3076 os.read(r, len(data) * 100)
3077 exc = cm.exception
3078 if isinstance(exc, RuntimeError):
3079 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3080 finally:
3081 wio.close()
3082 os.close(r)
3083
3084 def test_reentrant_write_buffered(self):
3085 self.check_reentrant_write(b"xy", mode="wb")
3086
3087 def test_reentrant_write_text(self):
3088 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3089
Antoine Pitrou707ce822011-02-25 21:24:11 +00003090 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3091 """Check that a buffered read, when it gets interrupted (either
3092 returning a partial result or EINTR), properly invokes the signal
3093 handler and retries if the latter returned successfully."""
3094 r, w = os.pipe()
3095 fdopen_kwargs["closefd"] = False
3096 def alarm_handler(sig, frame):
3097 os.write(w, b"bar")
3098 signal.signal(signal.SIGALRM, alarm_handler)
3099 try:
3100 rio = self.io.open(r, **fdopen_kwargs)
3101 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003102 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003103 # Expected behaviour:
3104 # - first raw read() returns partial b"foo"
3105 # - second raw read() returns EINTR
3106 # - third raw read() returns b"bar"
3107 self.assertEqual(decode(rio.read(6)), "foobar")
3108 finally:
3109 rio.close()
3110 os.close(w)
3111 os.close(r)
3112
Antoine Pitrou20db5112011-08-19 20:32:34 +02003113 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003114 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3115 mode="rb")
3116
Antoine Pitrou20db5112011-08-19 20:32:34 +02003117 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003118 self.check_interrupted_read_retry(lambda x: x,
3119 mode="r")
3120
3121 @unittest.skipUnless(threading, 'Threading required for this test.')
3122 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3123 """Check that a buffered write, when it gets interrupted (either
3124 returning a partial result or EINTR), properly invokes the signal
3125 handler and retries if the latter returned successfully."""
3126 select = support.import_module("select")
3127 # A quantity that exceeds the buffer size of an anonymous pipe's
3128 # write end.
3129 N = 1024 * 1024
3130 r, w = os.pipe()
3131 fdopen_kwargs["closefd"] = False
3132 # We need a separate thread to read from the pipe and allow the
3133 # write() to finish. This thread is started after the SIGALRM is
3134 # received (forcing a first EINTR in write()).
3135 read_results = []
3136 write_finished = False
3137 def _read():
3138 while not write_finished:
3139 while r in select.select([r], [], [], 1.0)[0]:
3140 s = os.read(r, 1024)
3141 read_results.append(s)
3142 t = threading.Thread(target=_read)
3143 t.daemon = True
3144 def alarm1(sig, frame):
3145 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003146 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003147 def alarm2(sig, frame):
3148 t.start()
3149 signal.signal(signal.SIGALRM, alarm1)
3150 try:
3151 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003152 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003153 # Expected behaviour:
3154 # - first raw write() is partial (because of the limited pipe buffer
3155 # and the first alarm)
3156 # - second raw write() returns EINTR (because of the second alarm)
3157 # - subsequent write()s are successful (either partial or complete)
3158 self.assertEqual(N, wio.write(item * N))
3159 wio.flush()
3160 write_finished = True
3161 t.join()
3162 self.assertEqual(N, sum(len(x) for x in read_results))
3163 finally:
3164 write_finished = True
3165 os.close(w)
3166 os.close(r)
3167 # This is deliberate. If we didn't close the file descriptor
3168 # before closing wio, wio would try to flush its internal
3169 # buffer, and could block (in case of failure).
3170 try:
3171 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003172 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003173 if e.errno != errno.EBADF:
3174 raise
3175
Antoine Pitrou20db5112011-08-19 20:32:34 +02003176 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003177 self.check_interrupted_write_retry(b"x", mode="wb")
3178
Antoine Pitrou20db5112011-08-19 20:32:34 +02003179 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003180 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3181
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003182
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003183class CSignalsTest(SignalsTest):
3184 io = io
3185
3186class PySignalsTest(SignalsTest):
3187 io = pyio
3188
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003189 # Handling reentrancy issues would slow down _pyio even more, so the
3190 # tests are disabled.
3191 test_reentrant_write_buffered = None
3192 test_reentrant_write_text = None
3193
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003194
Guido van Rossum28524c72007-02-27 05:47:44 +00003195def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003196 tests = (CIOTest, PyIOTest,
3197 CBufferedReaderTest, PyBufferedReaderTest,
3198 CBufferedWriterTest, PyBufferedWriterTest,
3199 CBufferedRWPairTest, PyBufferedRWPairTest,
3200 CBufferedRandomTest, PyBufferedRandomTest,
3201 StatefulIncrementalDecoderTest,
3202 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3203 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003204 CMiscIOTest, PyMiscIOTest,
3205 CSignalsTest, PySignalsTest,
3206 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003207
3208 # Put the namespaces of the IO module we are testing and some useful mock
3209 # classes in the __dict__ of each test.
3210 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003211 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003212 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3213 c_io_ns = {name : getattr(io, name) for name in all_members}
3214 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3215 globs = globals()
3216 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3217 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3218 # Avoid turning open into a bound method.
3219 py_io_ns["open"] = pyio.OpenWrapper
3220 for test in tests:
3221 if test.__name__.startswith("C"):
3222 for name, obj in c_io_ns.items():
3223 setattr(test, name, obj)
3224 elif test.__name__.startswith("Py"):
3225 for name, obj in py_io_ns.items():
3226 setattr(test, name, obj)
3227
3228 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003229
3230if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003231 test_main()