blob: c1ea6f245f35df7c2278dca9f43ca80cb1e8081f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka78980432013-01-15 01:12:17 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200604 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000605 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200606 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200765 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000782 raw.flush = bad_flush
783 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200784 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
Jesus Ceadc469452012-10-04 12:37:56 +0200836 @support.cpython_only
837 def test_buffer_freeing(self) :
838 bufsize = 4096
839 rawio = self.MockRawIO()
840 bufio = self.tp(rawio, buffer_size=bufsize)
841 size = sys.getsizeof(bufio) - bufsize
842 bufio.close()
843 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
846 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 def test_constructor(self):
849 rawio = self.MockRawIO([b"abc"])
850 bufio = self.tp(rawio)
851 bufio.__init__(rawio)
852 bufio.__init__(rawio, buffer_size=1024)
853 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000854 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
858 rawio = self.MockRawIO([b"abc"])
859 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000860 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000863 for arg in (None, 7):
864 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
865 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000866 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 # Invalid args
868 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870 def test_read1(self):
871 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
872 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"a", bufio.read(1))
874 self.assertEqual(b"b", bufio.read1(1))
875 self.assertEqual(rawio._reads, 1)
876 self.assertEqual(b"c", bufio.read1(100))
877 self.assertEqual(rawio._reads, 1)
878 self.assertEqual(b"d", bufio.read1(100))
879 self.assertEqual(rawio._reads, 2)
880 self.assertEqual(b"efg", bufio.read1(100))
881 self.assertEqual(rawio._reads, 3)
882 self.assertEqual(b"", bufio.read1(100))
883 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 # Invalid args
885 self.assertRaises(ValueError, bufio.read1, -1)
886
887 def test_readinto(self):
888 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
889 bufio = self.tp(rawio)
890 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000891 self.assertEqual(bufio.readinto(b), 2)
892 self.assertEqual(b, b"ab")
893 self.assertEqual(bufio.readinto(b), 2)
894 self.assertEqual(b, b"cd")
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ef")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"gf")
899 self.assertEqual(bufio.readinto(b), 0)
900 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200901 rawio = self.MockRawIO((b"abc", None))
902 bufio = self.tp(rawio)
903 self.assertEqual(bufio.readinto(b), 2)
904 self.assertEqual(b, b"ab")
905 self.assertEqual(bufio.readinto(b), 1)
906 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000908 def test_readlines(self):
909 def bufio():
910 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
911 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000912 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
913 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
914 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000917 data = b"abcdefghi"
918 dlen = len(data)
919
920 tests = [
921 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
922 [ 100, [ 3, 3, 3], [ dlen ] ],
923 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
924 ]
925
926 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 rawio = self.MockFileIO(data)
928 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos = 0
930 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000932 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000934 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000937 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
939 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000940 self.assertEqual(b"abcd", bufio.read(6))
941 self.assertEqual(b"e", bufio.read(1))
942 self.assertEqual(b"fg", bufio.read())
943 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000945 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000946
Victor Stinnera80987f2011-05-25 22:47:16 +0200947 rawio = self.MockRawIO((b"a", None, None))
948 self.assertEqual(b"a", rawio.readall())
949 self.assertIsNone(rawio.readall())
950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951 def test_read_past_eof(self):
952 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
953 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Ezio Melottib3aedd42010-11-20 19:04:17 +0000955 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read_all(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Ezio Melottib3aedd42010-11-20 19:04:17 +0000961 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Victor Stinner45df8202010-04-28 22:31:17 +0000963 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000964 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000966 try:
967 # Write out many bytes with exactly the same number of 0's,
968 # 1's... 255's. This will help us check that concurrent reading
969 # doesn't duplicate or forget contents.
970 N = 1000
971 l = list(range(256)) * N
972 random.shuffle(l)
973 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000974 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000976 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000978 errors = []
979 results = []
980 def f():
981 try:
982 # Intra-buffer read then buffer-flushing read
983 for n in cycle([1, 19]):
984 s = bufio.read(n)
985 if not s:
986 break
987 # list.append() is atomic
988 results.append(s)
989 except Exception as e:
990 errors.append(e)
991 raise
992 threads = [threading.Thread(target=f) for x in range(20)]
993 for t in threads:
994 t.start()
995 time.sleep(0.02) # yield
996 for t in threads:
997 t.join()
998 self.assertFalse(errors,
999 "the following exceptions were caught: %r" % errors)
1000 s = b''.join(results)
1001 for i in range(256):
1002 c = bytes(bytearray([i]))
1003 self.assertEqual(s.count(c), N)
1004 finally:
1005 support.unlink(support.TESTFN)
1006
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001007 def test_unseekable(self):
1008 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1009 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1010 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1011 bufio.read(1)
1012 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1013 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_misbehaved_io(self):
1016 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1017 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001018 self.assertRaises(OSError, bufio.seek, 0)
1019 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001021 def test_no_extraneous_read(self):
1022 # Issue #9550; when the raw IO object has satisfied the read request,
1023 # we should not issue any additional reads, otherwise it may block
1024 # (e.g. socket).
1025 bufsize = 16
1026 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1027 rawio = self.MockRawIO([b"x" * n])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 # Simple case: one raw read is enough to satisfy the request.
1031 self.assertEqual(rawio._extraneous_reads, 0,
1032 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1033 # A more complex case where two raw reads are needed to satisfy
1034 # the request.
1035 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1036 bufio = self.tp(rawio, bufsize)
1037 self.assertEqual(bufio.read(n), b"x" * n)
1038 self.assertEqual(rawio._extraneous_reads, 0,
1039 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1040
1041
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001042class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 tp = io.BufferedReader
1044
1045 def test_constructor(self):
1046 BufferedReaderTest.test_constructor(self)
1047 # The allocation can succeed on 32-bit builds, e.g. with more
1048 # than 2GB RAM and a 64-bit kernel.
1049 if sys.maxsize > 0x7FFFFFFF:
1050 rawio = self.MockRawIO()
1051 bufio = self.tp(rawio)
1052 self.assertRaises((OverflowError, MemoryError, ValueError),
1053 bufio.__init__, rawio, sys.maxsize)
1054
1055 def test_initialization(self):
1056 rawio = self.MockRawIO([b"abc"])
1057 bufio = self.tp(rawio)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1059 self.assertRaises(ValueError, bufio.read)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1061 self.assertRaises(ValueError, bufio.read)
1062 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1063 self.assertRaises(ValueError, bufio.read)
1064
1065 def test_misbehaved_io_read(self):
1066 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1067 bufio = self.tp(rawio)
1068 # _pyio.BufferedReader seems to implement reading different, so that
1069 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
1072 def test_garbage_collection(self):
1073 # C BufferedReader objects are collected.
1074 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001075 with support.check_warnings(('', ResourceWarning)):
1076 rawio = self.FileIO(support.TESTFN, "w+b")
1077 f = self.tp(rawio)
1078 f.f = f
1079 wr = weakref.ref(f)
1080 del f
1081 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001082 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083
R David Murray67bfe802013-02-23 21:51:05 -05001084 def test_args_error(self):
1085 # Issue #17275
1086 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1087 self.tp(io.BytesIO(), 1024, 1024, 1024)
1088
1089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090class PyBufferedReaderTest(BufferedReaderTest):
1091 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001092
Guido van Rossuma9e20242007-03-08 00:43:48 +00001093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1095 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 def test_constructor(self):
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio)
1100 bufio.__init__(rawio)
1101 bufio.__init__(rawio, buffer_size=1024)
1102 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001103 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001104 bufio.flush()
1105 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1106 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1107 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1108 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001111 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001113 def test_detach_flush(self):
1114 raw = self.MockRawIO()
1115 buf = self.tp(raw)
1116 buf.write(b"howdy!")
1117 self.assertFalse(raw._write_stack)
1118 buf.detach()
1119 self.assertEqual(raw._write_stack, [b"howdy!"])
1120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001122 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 writer = self.MockRawIO()
1124 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001125 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001126 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001127
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 def test_write_overflow(self):
1129 writer = self.MockRawIO()
1130 bufio = self.tp(writer, 8)
1131 contents = b"abcdefghijklmnop"
1132 for n in range(0, len(contents), 3):
1133 bufio.write(contents[n:n+3])
1134 flushed = b"".join(writer._write_stack)
1135 # At least (total - 8) bytes were implicitly flushed, perhaps more
1136 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001137 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 def check_writes(self, intermediate_func):
1140 # Lots of writes, test the flushed output is as expected.
1141 contents = bytes(range(256)) * 1000
1142 n = 0
1143 writer = self.MockRawIO()
1144 bufio = self.tp(writer, 13)
1145 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1146 def gen_sizes():
1147 for size in count(1):
1148 for i in range(15):
1149 yield size
1150 sizes = gen_sizes()
1151 while n < len(contents):
1152 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 intermediate_func(bufio)
1155 n += size
1156 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001157 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_writes(self):
1160 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 def test_writes_and_flushes(self):
1163 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001165 def test_writes_and_seeks(self):
1166 def _seekabs(bufio):
1167 pos = bufio.tell()
1168 bufio.seek(pos + 1, 0)
1169 bufio.seek(pos - 1, 0)
1170 bufio.seek(pos, 0)
1171 self.check_writes(_seekabs)
1172 def _seekrel(bufio):
1173 pos = bufio.seek(0, 1)
1174 bufio.seek(+1, 1)
1175 bufio.seek(-1, 1)
1176 bufio.seek(pos, 0)
1177 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 def test_writes_and_truncates(self):
1180 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 def test_write_non_blocking(self):
1183 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001184 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001185
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(bufio.write(b"abcd"), 4)
1187 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 # 1 byte will be written, the rest will be buffered
1189 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001190 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1193 raw.block_on(b"0")
1194 try:
1195 bufio.write(b"opqrwxyz0123456789")
1196 except self.BlockingIOError as e:
1197 written = e.characters_written
1198 else:
1199 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001200 self.assertEqual(written, 16)
1201 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001203
Ezio Melottib3aedd42010-11-20 19:04:17 +00001204 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001205 s = raw.pop_written()
1206 # Previously buffered bytes were flushed
1207 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 def test_write_and_rewind(self):
1210 raw = io.BytesIO()
1211 bufio = self.tp(raw, 4)
1212 self.assertEqual(bufio.write(b"abcdef"), 6)
1213 self.assertEqual(bufio.tell(), 6)
1214 bufio.seek(0, 0)
1215 self.assertEqual(bufio.write(b"XY"), 2)
1216 bufio.seek(6, 0)
1217 self.assertEqual(raw.getvalue(), b"XYcdef")
1218 self.assertEqual(bufio.write(b"123456"), 6)
1219 bufio.flush()
1220 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222 def test_flush(self):
1223 writer = self.MockRawIO()
1224 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001225 bufio.write(b"abc")
1226 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001227 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001228
Antoine Pitrou131a4892012-10-16 22:57:11 +02001229 def test_writelines(self):
1230 l = [b'ab', b'cd', b'ef']
1231 writer = self.MockRawIO()
1232 bufio = self.tp(writer, 8)
1233 bufio.writelines(l)
1234 bufio.flush()
1235 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1236
1237 def test_writelines_userlist(self):
1238 l = UserList([b'ab', b'cd', b'ef'])
1239 writer = self.MockRawIO()
1240 bufio = self.tp(writer, 8)
1241 bufio.writelines(l)
1242 bufio.flush()
1243 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1244
1245 def test_writelines_error(self):
1246 writer = self.MockRawIO()
1247 bufio = self.tp(writer, 8)
1248 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1249 self.assertRaises(TypeError, bufio.writelines, None)
1250 self.assertRaises(TypeError, bufio.writelines, 'abc')
1251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 def test_destructor(self):
1253 writer = self.MockRawIO()
1254 bufio = self.tp(writer, 8)
1255 bufio.write(b"abc")
1256 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001257 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001258 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259
1260 def test_truncate(self):
1261 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001262 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 bufio = self.tp(raw, 8)
1264 bufio.write(b"abcdef")
1265 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001266 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001267 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 self.assertEqual(f.read(), b"abc")
1269
Victor Stinner45df8202010-04-28 22:31:17 +00001270 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001271 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001273 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 # Write out many bytes from many threads and test they were
1275 # all flushed.
1276 N = 1000
1277 contents = bytes(range(256)) * N
1278 sizes = cycle([1, 19])
1279 n = 0
1280 queue = deque()
1281 while n < len(contents):
1282 size = next(sizes)
1283 queue.append(contents[n:n+size])
1284 n += size
1285 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001286 # We use a real file object because it allows us to
1287 # exercise situations where the GIL is released before
1288 # writing the buffer to the raw streams. This is in addition
1289 # to concurrency issues due to switching threads in the middle
1290 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001291 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001293 errors = []
1294 def f():
1295 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001296 while True:
1297 try:
1298 s = queue.popleft()
1299 except IndexError:
1300 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001301 bufio.write(s)
1302 except Exception as e:
1303 errors.append(e)
1304 raise
1305 threads = [threading.Thread(target=f) for x in range(20)]
1306 for t in threads:
1307 t.start()
1308 time.sleep(0.02) # yield
1309 for t in threads:
1310 t.join()
1311 self.assertFalse(errors,
1312 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001314 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 s = f.read()
1316 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001318 finally:
1319 support.unlink(support.TESTFN)
1320
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001321 def test_misbehaved_io(self):
1322 rawio = self.MisbehavedRawIO()
1323 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001324 self.assertRaises(OSError, bufio.seek, 0)
1325 self.assertRaises(OSError, bufio.tell)
1326 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001327
Florent Xicluna109d5732012-07-07 17:03:22 +02001328 def test_max_buffer_size_removal(self):
1329 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001330 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001331
Benjamin Peterson68623612012-12-20 11:53:11 -06001332 def test_write_error_on_close(self):
1333 raw = self.MockRawIO()
1334 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001335 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001336 raw.write = bad_write
1337 b = self.tp(raw)
1338 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001339 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001340 self.assertTrue(b.closed)
1341
Benjamin Peterson59406a92009-03-26 17:10:29 +00001342
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001343class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 tp = io.BufferedWriter
1345
1346 def test_constructor(self):
1347 BufferedWriterTest.test_constructor(self)
1348 # The allocation can succeed on 32-bit builds, e.g. with more
1349 # than 2GB RAM and a 64-bit kernel.
1350 if sys.maxsize > 0x7FFFFFFF:
1351 rawio = self.MockRawIO()
1352 bufio = self.tp(rawio)
1353 self.assertRaises((OverflowError, MemoryError, ValueError),
1354 bufio.__init__, rawio, sys.maxsize)
1355
1356 def test_initialization(self):
1357 rawio = self.MockRawIO()
1358 bufio = self.tp(rawio)
1359 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1360 self.assertRaises(ValueError, bufio.write, b"def")
1361 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1362 self.assertRaises(ValueError, bufio.write, b"def")
1363 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1364 self.assertRaises(ValueError, bufio.write, b"def")
1365
1366 def test_garbage_collection(self):
1367 # C BufferedWriter objects are collected, and collecting them flushes
1368 # all data to disk.
1369 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001370 with support.check_warnings(('', ResourceWarning)):
1371 rawio = self.FileIO(support.TESTFN, "w+b")
1372 f = self.tp(rawio)
1373 f.write(b"123xxx")
1374 f.x = f
1375 wr = weakref.ref(f)
1376 del f
1377 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001378 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001379 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380 self.assertEqual(f.read(), b"123xxx")
1381
R David Murray67bfe802013-02-23 21:51:05 -05001382 def test_args_error(self):
1383 # Issue #17275
1384 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1385 self.tp(io.BytesIO(), 1024, 1024, 1024)
1386
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387
1388class PyBufferedWriterTest(BufferedWriterTest):
1389 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001390
Guido van Rossum01a27522007-03-07 01:00:12 +00001391class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001392
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001393 def test_constructor(self):
1394 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001395 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001396
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001397 def test_detach(self):
1398 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1399 self.assertRaises(self.UnsupportedOperation, pair.detach)
1400
Florent Xicluna109d5732012-07-07 17:03:22 +02001401 def test_constructor_max_buffer_size_removal(self):
1402 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001403 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001404
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001405 def test_constructor_with_not_readable(self):
1406 class NotReadable(MockRawIO):
1407 def readable(self):
1408 return False
1409
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001410 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001411
1412 def test_constructor_with_not_writeable(self):
1413 class NotWriteable(MockRawIO):
1414 def writable(self):
1415 return False
1416
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001417 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001418
1419 def test_read(self):
1420 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1421
1422 self.assertEqual(pair.read(3), b"abc")
1423 self.assertEqual(pair.read(1), b"d")
1424 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001425 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1426 self.assertEqual(pair.read(None), b"abc")
1427
1428 def test_readlines(self):
1429 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1430 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1431 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1432 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001433
1434 def test_read1(self):
1435 # .read1() is delegated to the underlying reader object, so this test
1436 # can be shallow.
1437 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1438
1439 self.assertEqual(pair.read1(3), b"abc")
1440
1441 def test_readinto(self):
1442 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1443
1444 data = bytearray(5)
1445 self.assertEqual(pair.readinto(data), 5)
1446 self.assertEqual(data, b"abcde")
1447
1448 def test_write(self):
1449 w = self.MockRawIO()
1450 pair = self.tp(self.MockRawIO(), w)
1451
1452 pair.write(b"abc")
1453 pair.flush()
1454 pair.write(b"def")
1455 pair.flush()
1456 self.assertEqual(w._write_stack, [b"abc", b"def"])
1457
1458 def test_peek(self):
1459 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1460
1461 self.assertTrue(pair.peek(3).startswith(b"abc"))
1462 self.assertEqual(pair.read(3), b"abc")
1463
1464 def test_readable(self):
1465 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1466 self.assertTrue(pair.readable())
1467
1468 def test_writeable(self):
1469 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1470 self.assertTrue(pair.writable())
1471
1472 def test_seekable(self):
1473 # BufferedRWPairs are never seekable, even if their readers and writers
1474 # are.
1475 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1476 self.assertFalse(pair.seekable())
1477
1478 # .flush() is delegated to the underlying writer object and has been
1479 # tested in the test_write method.
1480
1481 def test_close_and_closed(self):
1482 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1483 self.assertFalse(pair.closed)
1484 pair.close()
1485 self.assertTrue(pair.closed)
1486
1487 def test_isatty(self):
1488 class SelectableIsAtty(MockRawIO):
1489 def __init__(self, isatty):
1490 MockRawIO.__init__(self)
1491 self._isatty = isatty
1492
1493 def isatty(self):
1494 return self._isatty
1495
1496 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1497 self.assertFalse(pair.isatty())
1498
1499 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1500 self.assertTrue(pair.isatty())
1501
1502 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1503 self.assertTrue(pair.isatty())
1504
1505 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1506 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001508class CBufferedRWPairTest(BufferedRWPairTest):
1509 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511class PyBufferedRWPairTest(BufferedRWPairTest):
1512 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001514
1515class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1516 read_mode = "rb+"
1517 write_mode = "wb+"
1518
1519 def test_constructor(self):
1520 BufferedReaderTest.test_constructor(self)
1521 BufferedWriterTest.test_constructor(self)
1522
1523 def test_read_and_write(self):
1524 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001525 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001526
1527 self.assertEqual(b"as", rw.read(2))
1528 rw.write(b"ddd")
1529 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001530 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001532 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 def test_seek_and_tell(self):
1535 raw = self.BytesIO(b"asdfghjkl")
1536 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001537
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(b"as", rw.read(2))
1539 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001540 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001541 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001542
Antoine Pitroue05565e2011-08-20 14:39:23 +02001543 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001544 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001545 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001546 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001547 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001549 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(7, rw.tell())
1551 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001552 rw.flush()
1553 self.assertEqual(b"asdf123fl", raw.getvalue())
1554
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001555 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557 def check_flush_and_read(self, read_func):
1558 raw = self.BytesIO(b"abcdefghi")
1559 bufio = self.tp(raw)
1560
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001563 self.assertEqual(b"ef", read_func(bufio, 2))
1564 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001566 self.assertEqual(6, bufio.tell())
1567 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001568 raw.seek(0, 0)
1569 raw.write(b"XYZ")
1570 # flush() resets the read buffer
1571 bufio.flush()
1572 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001573 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574
1575 def test_flush_and_read(self):
1576 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1577
1578 def test_flush_and_readinto(self):
1579 def _readinto(bufio, n=-1):
1580 b = bytearray(n if n >= 0 else 9999)
1581 n = bufio.readinto(b)
1582 return bytes(b[:n])
1583 self.check_flush_and_read(_readinto)
1584
1585 def test_flush_and_peek(self):
1586 def _peek(bufio, n=-1):
1587 # This relies on the fact that the buffer can contain the whole
1588 # raw stream, otherwise peek() can return less.
1589 b = bufio.peek(n)
1590 if n != -1:
1591 b = b[:n]
1592 bufio.seek(len(b), 1)
1593 return b
1594 self.check_flush_and_read(_peek)
1595
1596 def test_flush_and_write(self):
1597 raw = self.BytesIO(b"abcdefghi")
1598 bufio = self.tp(raw)
1599
1600 bufio.write(b"123")
1601 bufio.flush()
1602 bufio.write(b"45")
1603 bufio.flush()
1604 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001605 self.assertEqual(b"12345fghi", raw.getvalue())
1606 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001607
1608 def test_threads(self):
1609 BufferedReaderTest.test_threads(self)
1610 BufferedWriterTest.test_threads(self)
1611
1612 def test_writes_and_peek(self):
1613 def _peek(bufio):
1614 bufio.peek(1)
1615 self.check_writes(_peek)
1616 def _peek(bufio):
1617 pos = bufio.tell()
1618 bufio.seek(-1, 1)
1619 bufio.peek(1)
1620 bufio.seek(pos, 0)
1621 self.check_writes(_peek)
1622
1623 def test_writes_and_reads(self):
1624 def _read(bufio):
1625 bufio.seek(-1, 1)
1626 bufio.read(1)
1627 self.check_writes(_read)
1628
1629 def test_writes_and_read1s(self):
1630 def _read1(bufio):
1631 bufio.seek(-1, 1)
1632 bufio.read1(1)
1633 self.check_writes(_read1)
1634
1635 def test_writes_and_readintos(self):
1636 def _read(bufio):
1637 bufio.seek(-1, 1)
1638 bufio.readinto(bytearray(1))
1639 self.check_writes(_read)
1640
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001641 def test_write_after_readahead(self):
1642 # Issue #6629: writing after the buffer was filled by readahead should
1643 # first rewind the raw stream.
1644 for overwrite_size in [1, 5]:
1645 raw = self.BytesIO(b"A" * 10)
1646 bufio = self.tp(raw, 4)
1647 # Trigger readahead
1648 self.assertEqual(bufio.read(1), b"A")
1649 self.assertEqual(bufio.tell(), 1)
1650 # Overwriting should rewind the raw stream if it needs so
1651 bufio.write(b"B" * overwrite_size)
1652 self.assertEqual(bufio.tell(), overwrite_size + 1)
1653 # If the write size was smaller than the buffer size, flush() and
1654 # check that rewind happens.
1655 bufio.flush()
1656 self.assertEqual(bufio.tell(), overwrite_size + 1)
1657 s = raw.getvalue()
1658 self.assertEqual(s,
1659 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1660
Antoine Pitrou7c404892011-05-13 00:13:33 +02001661 def test_write_rewind_write(self):
1662 # Various combinations of reading / writing / seeking backwards / writing again
1663 def mutate(bufio, pos1, pos2):
1664 assert pos2 >= pos1
1665 # Fill the buffer
1666 bufio.seek(pos1)
1667 bufio.read(pos2 - pos1)
1668 bufio.write(b'\x02')
1669 # This writes earlier than the previous write, but still inside
1670 # the buffer.
1671 bufio.seek(pos1)
1672 bufio.write(b'\x01')
1673
1674 b = b"\x80\x81\x82\x83\x84"
1675 for i in range(0, len(b)):
1676 for j in range(i, len(b)):
1677 raw = self.BytesIO(b)
1678 bufio = self.tp(raw, 100)
1679 mutate(bufio, i, j)
1680 bufio.flush()
1681 expected = bytearray(b)
1682 expected[j] = 2
1683 expected[i] = 1
1684 self.assertEqual(raw.getvalue(), expected,
1685 "failed result for i=%d, j=%d" % (i, j))
1686
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001687 def test_truncate_after_read_or_write(self):
1688 raw = self.BytesIO(b"A" * 10)
1689 bufio = self.tp(raw, 100)
1690 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1691 self.assertEqual(bufio.truncate(), 2)
1692 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1693 self.assertEqual(bufio.truncate(), 4)
1694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 def test_misbehaved_io(self):
1696 BufferedReaderTest.test_misbehaved_io(self)
1697 BufferedWriterTest.test_misbehaved_io(self)
1698
Antoine Pitroue05565e2011-08-20 14:39:23 +02001699 def test_interleaved_read_write(self):
1700 # Test for issue #12213
1701 with self.BytesIO(b'abcdefgh') as raw:
1702 with self.tp(raw, 100) as f:
1703 f.write(b"1")
1704 self.assertEqual(f.read(1), b'b')
1705 f.write(b'2')
1706 self.assertEqual(f.read1(1), b'd')
1707 f.write(b'3')
1708 buf = bytearray(1)
1709 f.readinto(buf)
1710 self.assertEqual(buf, b'f')
1711 f.write(b'4')
1712 self.assertEqual(f.peek(1), b'h')
1713 f.flush()
1714 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1715
1716 with self.BytesIO(b'abc') as raw:
1717 with self.tp(raw, 100) as f:
1718 self.assertEqual(f.read(1), b'a')
1719 f.write(b"2")
1720 self.assertEqual(f.read(1), b'c')
1721 f.flush()
1722 self.assertEqual(raw.getvalue(), b'a2c')
1723
1724 def test_interleaved_readline_write(self):
1725 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1726 with self.tp(raw) as f:
1727 f.write(b'1')
1728 self.assertEqual(f.readline(), b'b\n')
1729 f.write(b'2')
1730 self.assertEqual(f.readline(), b'def\n')
1731 f.write(b'3')
1732 self.assertEqual(f.readline(), b'\n')
1733 f.flush()
1734 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1735
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001736 # You can't construct a BufferedRandom over a non-seekable stream.
1737 test_unseekable = None
1738
R David Murray67bfe802013-02-23 21:51:05 -05001739
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001740class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 tp = io.BufferedRandom
1742
1743 def test_constructor(self):
1744 BufferedRandomTest.test_constructor(self)
1745 # The allocation can succeed on 32-bit builds, e.g. with more
1746 # than 2GB RAM and a 64-bit kernel.
1747 if sys.maxsize > 0x7FFFFFFF:
1748 rawio = self.MockRawIO()
1749 bufio = self.tp(rawio)
1750 self.assertRaises((OverflowError, MemoryError, ValueError),
1751 bufio.__init__, rawio, sys.maxsize)
1752
1753 def test_garbage_collection(self):
1754 CBufferedReaderTest.test_garbage_collection(self)
1755 CBufferedWriterTest.test_garbage_collection(self)
1756
R David Murray67bfe802013-02-23 21:51:05 -05001757 def test_args_error(self):
1758 # Issue #17275
1759 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1760 self.tp(io.BytesIO(), 1024, 1024, 1024)
1761
1762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763class PyBufferedRandomTest(BufferedRandomTest):
1764 tp = pyio.BufferedRandom
1765
1766
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001767# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1768# properties:
1769# - A single output character can correspond to many bytes of input.
1770# - The number of input bytes to complete the character can be
1771# undetermined until the last input byte is received.
1772# - The number of input bytes can vary depending on previous input.
1773# - A single input byte can correspond to many characters of output.
1774# - The number of output characters can be undetermined until the
1775# last input byte is received.
1776# - The number of output characters can vary depending on previous input.
1777
1778class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1779 """
1780 For testing seek/tell behavior with a stateful, buffering decoder.
1781
1782 Input is a sequence of words. Words may be fixed-length (length set
1783 by input) or variable-length (period-terminated). In variable-length
1784 mode, extra periods are ignored. Possible words are:
1785 - 'i' followed by a number sets the input length, I (maximum 99).
1786 When I is set to 0, words are space-terminated.
1787 - 'o' followed by a number sets the output length, O (maximum 99).
1788 - Any other word is converted into a word followed by a period on
1789 the output. The output word consists of the input word truncated
1790 or padded out with hyphens to make its length equal to O. If O
1791 is 0, the word is output verbatim without truncating or padding.
1792 I and O are initially set to 1. When I changes, any buffered input is
1793 re-scanned according to the new I. EOF also terminates the last word.
1794 """
1795
1796 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001797 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001798 self.reset()
1799
1800 def __repr__(self):
1801 return '<SID %x>' % id(self)
1802
1803 def reset(self):
1804 self.i = 1
1805 self.o = 1
1806 self.buffer = bytearray()
1807
1808 def getstate(self):
1809 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1810 return bytes(self.buffer), i*100 + o
1811
1812 def setstate(self, state):
1813 buffer, io = state
1814 self.buffer = bytearray(buffer)
1815 i, o = divmod(io, 100)
1816 self.i, self.o = i ^ 1, o ^ 1
1817
1818 def decode(self, input, final=False):
1819 output = ''
1820 for b in input:
1821 if self.i == 0: # variable-length, terminated with period
1822 if b == ord('.'):
1823 if self.buffer:
1824 output += self.process_word()
1825 else:
1826 self.buffer.append(b)
1827 else: # fixed-length, terminate after self.i bytes
1828 self.buffer.append(b)
1829 if len(self.buffer) == self.i:
1830 output += self.process_word()
1831 if final and self.buffer: # EOF terminates the last word
1832 output += self.process_word()
1833 return output
1834
1835 def process_word(self):
1836 output = ''
1837 if self.buffer[0] == ord('i'):
1838 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1839 elif self.buffer[0] == ord('o'):
1840 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1841 else:
1842 output = self.buffer.decode('ascii')
1843 if len(output) < self.o:
1844 output += '-'*self.o # pad out with hyphens
1845 if self.o:
1846 output = output[:self.o] # truncate to output length
1847 output += '.'
1848 self.buffer = bytearray()
1849 return output
1850
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001851 codecEnabled = False
1852
1853 @classmethod
1854 def lookupTestDecoder(cls, name):
1855 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001856 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001857 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001858 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001859 incrementalencoder=None,
1860 streamreader=None, streamwriter=None,
1861 incrementaldecoder=cls)
1862
1863# Register the previous decoder for testing.
1864# Disabled by default, tests will enable it.
1865codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1866
1867
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001868class StatefulIncrementalDecoderTest(unittest.TestCase):
1869 """
1870 Make sure the StatefulIncrementalDecoder actually works.
1871 """
1872
1873 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001874 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001875 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001876 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001877 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001878 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001879 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001880 # I=0, O=6 (variable-length input, fixed-length output)
1881 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1882 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001883 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001884 # I=6, O=3 (fixed-length input > fixed-length output)
1885 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1886 # I=0, then 3; O=29, then 15 (with longer output)
1887 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1888 'a----------------------------.' +
1889 'b----------------------------.' +
1890 'cde--------------------------.' +
1891 'abcdefghijabcde.' +
1892 'a.b------------.' +
1893 '.c.------------.' +
1894 'd.e------------.' +
1895 'k--------------.' +
1896 'l--------------.' +
1897 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 ]
1899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001901 # Try a few one-shot test cases.
1902 for input, eof, output in self.test_cases:
1903 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001904 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001905
1906 # Also test an unfinished decode, followed by forcing EOF.
1907 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001908 self.assertEqual(d.decode(b'oiabcd'), '')
1909 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001910
1911class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001912
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001913 def setUp(self):
1914 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1915 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001916 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001917
Guido van Rossumd0712812007-04-11 16:32:43 +00001918 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001919 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 def test_constructor(self):
1922 r = self.BytesIO(b"\xc3\xa9\n\n")
1923 b = self.BufferedReader(r, 1000)
1924 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001925 t.__init__(b, encoding="latin-1", newline="\r\n")
1926 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001928 t.__init__(b, encoding="utf-8", line_buffering=True)
1929 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001930 self.assertEqual(t.line_buffering, True)
1931 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 self.assertRaises(TypeError, t.__init__, b, newline=42)
1933 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1934
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001935 def test_detach(self):
1936 r = self.BytesIO()
1937 b = self.BufferedWriter(r)
1938 t = self.TextIOWrapper(b)
1939 self.assertIs(t.detach(), b)
1940
1941 t = self.TextIOWrapper(b, encoding="ascii")
1942 t.write("howdy")
1943 self.assertFalse(r.getvalue())
1944 t.detach()
1945 self.assertEqual(r.getvalue(), b"howdy")
1946 self.assertRaises(ValueError, t.detach)
1947
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001948 def test_repr(self):
1949 raw = self.BytesIO("hello".encode("utf-8"))
1950 b = self.BufferedReader(raw)
1951 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001952 modname = self.TextIOWrapper.__module__
1953 self.assertEqual(repr(t),
1954 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1955 raw.name = "dummy"
1956 self.assertEqual(repr(t),
1957 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001958 t.mode = "r"
1959 self.assertEqual(repr(t),
1960 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001961 raw.name = b"dummy"
1962 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001963 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 def test_line_buffering(self):
1966 r = self.BytesIO()
1967 b = self.BufferedWriter(r, 1000)
1968 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001969 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001970 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001971 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001972 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001973 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001975
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001976 def test_default_encoding(self):
1977 old_environ = dict(os.environ)
1978 try:
1979 # try to get a user preferred encoding different than the current
1980 # locale encoding to check that TextIOWrapper() uses the current
1981 # locale encoding and not the user preferred encoding
1982 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1983 if key in os.environ:
1984 del os.environ[key]
1985
1986 current_locale_encoding = locale.getpreferredencoding(False)
1987 b = self.BytesIO()
1988 t = self.TextIOWrapper(b)
1989 self.assertEqual(t.encoding, current_locale_encoding)
1990 finally:
1991 os.environ.clear()
1992 os.environ.update(old_environ)
1993
Serhiy Storchaka78980432013-01-15 01:12:17 +02001994 # Issue 15989
1995 def test_device_encoding(self):
1996 b = self.BytesIO()
1997 b.fileno = lambda: _testcapi.INT_MAX + 1
1998 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1999 b.fileno = lambda: _testcapi.UINT_MAX + 1
2000 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 def test_encoding(self):
2003 # Check the encoding attribute is always set, and valid
2004 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002005 t = self.TextIOWrapper(b, encoding="utf-8")
2006 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002008 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 codecs.lookup(t.encoding)
2010
2011 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002012 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 b = self.BytesIO(b"abc\n\xff\n")
2014 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002015 self.assertRaises(UnicodeError, t.read)
2016 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017 b = self.BytesIO(b"abc\n\xff\n")
2018 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002019 self.assertRaises(UnicodeError, t.read)
2020 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 b = self.BytesIO(b"abc\n\xff\n")
2022 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002024 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 b = self.BytesIO(b"abc\n\xff\n")
2026 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002027 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002030 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 b = self.BytesIO()
2032 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002033 self.assertRaises(UnicodeError, t.write, "\xff")
2034 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 b = self.BytesIO()
2036 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002037 self.assertRaises(UnicodeError, t.write, "\xff")
2038 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 b = self.BytesIO()
2040 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002041 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002042 t.write("abc\xffdef\n")
2043 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002044 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002045 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 b = self.BytesIO()
2047 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002048 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002049 t.write("abc\xffdef\n")
2050 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002054 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2055
2056 tests = [
2057 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002058 [ '', input_lines ],
2059 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2060 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2061 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002062 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002063 encodings = (
2064 'utf-8', 'latin-1',
2065 'utf-16', 'utf-16-le', 'utf-16-be',
2066 'utf-32', 'utf-32-le', 'utf-32-be',
2067 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002068
Guido van Rossum8358db22007-08-18 21:39:55 +00002069 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002070 # character in TextIOWrapper._pending_line.
2071 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002072 # XXX: str.encode() should return bytes
2073 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002074 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002075 for bufsize in range(1, 10):
2076 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2078 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002079 encoding=encoding)
2080 if do_reads:
2081 got_lines = []
2082 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002083 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002084 if c2 == '':
2085 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002086 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002087 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002088 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002089 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002090
2091 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(got_line, exp_line)
2093 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 def test_newlines_input(self):
2096 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002097 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2098 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002099 (None, normalized.decode("ascii").splitlines(keepends=True)),
2100 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2102 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2103 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002104 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 buf = self.BytesIO(testdata)
2106 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002107 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002108 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002109 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 def test_newlines_output(self):
2112 testdict = {
2113 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2114 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2115 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2116 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2117 }
2118 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2119 for newline, expected in tests:
2120 buf = self.BytesIO()
2121 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2122 txt.write("AAA\nB")
2123 txt.write("BB\nCCC\n")
2124 txt.write("X\rY\r\nZ")
2125 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(buf.closed, False)
2127 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128
2129 def test_destructor(self):
2130 l = []
2131 base = self.BytesIO
2132 class MyBytesIO(base):
2133 def close(self):
2134 l.append(self.getvalue())
2135 base.close(self)
2136 b = MyBytesIO()
2137 t = self.TextIOWrapper(b, encoding="ascii")
2138 t.write("abc")
2139 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002140 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002141 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142
2143 def test_override_destructor(self):
2144 record = []
2145 class MyTextIO(self.TextIOWrapper):
2146 def __del__(self):
2147 record.append(1)
2148 try:
2149 f = super().__del__
2150 except AttributeError:
2151 pass
2152 else:
2153 f()
2154 def close(self):
2155 record.append(2)
2156 super().close()
2157 def flush(self):
2158 record.append(3)
2159 super().flush()
2160 b = self.BytesIO()
2161 t = MyTextIO(b, encoding="ascii")
2162 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002163 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 self.assertEqual(record, [1, 2, 3])
2165
2166 def test_error_through_destructor(self):
2167 # Test that the exception state is not modified by a destructor,
2168 # even if close() fails.
2169 rawio = self.CloseFailureIO()
2170 def f():
2171 self.TextIOWrapper(rawio).xyzzy
2172 with support.captured_output("stderr") as s:
2173 self.assertRaises(AttributeError, f)
2174 s = s.getvalue().strip()
2175 if s:
2176 # The destructor *may* have printed an unraisable error, check it
2177 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002178 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002179 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002180
Guido van Rossum9b76da62007-04-11 01:09:03 +00002181 # Systematic tests of the text I/O API
2182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002184 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002185 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002187 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002188 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002189 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002191 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(f.tell(), 0)
2193 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002194 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(f.seek(0), 0)
2196 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002197 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(f.read(2), "ab")
2199 self.assertEqual(f.read(1), "c")
2200 self.assertEqual(f.read(1), "")
2201 self.assertEqual(f.read(), "")
2202 self.assertEqual(f.tell(), cookie)
2203 self.assertEqual(f.seek(0), 0)
2204 self.assertEqual(f.seek(0, 2), cookie)
2205 self.assertEqual(f.write("def"), 3)
2206 self.assertEqual(f.seek(cookie), cookie)
2207 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002208 if enc.startswith("utf"):
2209 self.multi_line_test(f, enc)
2210 f.close()
2211
2212 def multi_line_test(self, f, enc):
2213 f.seek(0)
2214 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002215 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002216 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002217 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002218 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002219 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002220 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002221 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002222 wlines.append((f.tell(), line))
2223 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002224 f.seek(0)
2225 rlines = []
2226 while True:
2227 pos = f.tell()
2228 line = f.readline()
2229 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002230 break
2231 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002232 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002235 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002236 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002237 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002238 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002239 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002240 p2 = f.tell()
2241 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002242 self.assertEqual(f.tell(), p0)
2243 self.assertEqual(f.readline(), "\xff\n")
2244 self.assertEqual(f.tell(), p1)
2245 self.assertEqual(f.readline(), "\xff\n")
2246 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002247 f.seek(0)
2248 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002250 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002251 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002252 f.close()
2253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254 def test_seeking(self):
2255 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002256 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002257 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002258 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002259 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002260 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002261 suffix = bytes(u_suffix.encode("utf-8"))
2262 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002263 with self.open(support.TESTFN, "wb") as f:
2264 f.write(line*2)
2265 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2266 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002267 self.assertEqual(s, str(prefix, "ascii"))
2268 self.assertEqual(f.tell(), prefix_size)
2269 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002272 # Regression test for a specific bug
2273 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002274 with self.open(support.TESTFN, "wb") as f:
2275 f.write(data)
2276 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2277 f._CHUNK_SIZE # Just test that it exists
2278 f._CHUNK_SIZE = 2
2279 f.readline()
2280 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 def test_seek_and_tell(self):
2283 #Test seek/tell using the StatefulIncrementalDecoder.
2284 # Make test faster by doing smaller seeks
2285 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002286
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002287 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002288 """Tell/seek to various points within a data stream and ensure
2289 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002291 f.write(data)
2292 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002293 f = self.open(support.TESTFN, encoding='test_decoder')
2294 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002295 decoded = f.read()
2296 f.close()
2297
Neal Norwitze2b07052008-03-18 19:52:05 +00002298 for i in range(min_pos, len(decoded) + 1): # seek positions
2299 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002302 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002303 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002304 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002305 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002306 f.close()
2307
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002308 # Enable the test decoder.
2309 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002310
2311 # Run the tests.
2312 try:
2313 # Try each test case.
2314 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002315 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002316
2317 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002318 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2319 offset = CHUNK_SIZE - len(input)//2
2320 prefix = b'.'*offset
2321 # Don't bother seeking into the prefix (takes too long).
2322 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002323 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002324
2325 # Ensure our test decoder won't interfere with subsequent tests.
2326 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002327 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002329 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002330 data = "1234567890"
2331 tests = ("utf-16",
2332 "utf-16-le",
2333 "utf-16-be",
2334 "utf-32",
2335 "utf-32-le",
2336 "utf-32-be")
2337 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 buf = self.BytesIO()
2339 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002340 # Check if the BOM is written only once (see issue1753).
2341 f.write(data)
2342 f.write(data)
2343 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002344 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002345 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002346 self.assertEqual(f.read(), data * 2)
2347 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002348
Benjamin Petersona1b49012009-03-31 23:11:32 +00002349 def test_unreadable(self):
2350 class UnReadable(self.BytesIO):
2351 def readable(self):
2352 return False
2353 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002354 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002355
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 def test_read_one_by_one(self):
2357 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002358 reads = ""
2359 while True:
2360 c = txt.read(1)
2361 if not c:
2362 break
2363 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002364 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002365
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002366 def test_readlines(self):
2367 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2368 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2369 txt.seek(0)
2370 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2371 txt.seek(0)
2372 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2373
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002374 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002376 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002378 reads = ""
2379 while True:
2380 c = txt.read(128)
2381 if not c:
2382 break
2383 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002385
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002386 def test_writelines(self):
2387 l = ['ab', 'cd', 'ef']
2388 buf = self.BytesIO()
2389 txt = self.TextIOWrapper(buf)
2390 txt.writelines(l)
2391 txt.flush()
2392 self.assertEqual(buf.getvalue(), b'abcdef')
2393
2394 def test_writelines_userlist(self):
2395 l = UserList(['ab', 'cd', 'ef'])
2396 buf = self.BytesIO()
2397 txt = self.TextIOWrapper(buf)
2398 txt.writelines(l)
2399 txt.flush()
2400 self.assertEqual(buf.getvalue(), b'abcdef')
2401
2402 def test_writelines_error(self):
2403 txt = self.TextIOWrapper(self.BytesIO())
2404 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2405 self.assertRaises(TypeError, txt.writelines, None)
2406 self.assertRaises(TypeError, txt.writelines, b'abc')
2407
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002408 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002410
2411 # read one char at a time
2412 reads = ""
2413 while True:
2414 c = txt.read(1)
2415 if not c:
2416 break
2417 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002418 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002419
2420 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002421 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002422 txt._CHUNK_SIZE = 4
2423
2424 reads = ""
2425 while True:
2426 c = txt.read(4)
2427 if not c:
2428 break
2429 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002430 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431
2432 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002434 txt._CHUNK_SIZE = 4
2435
2436 reads = txt.read(4)
2437 reads += txt.read(4)
2438 reads += txt.readline()
2439 reads += txt.readline()
2440 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002441 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002442
2443 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002445 txt._CHUNK_SIZE = 4
2446
2447 reads = txt.read(4)
2448 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002449 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002450
2451 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 txt._CHUNK_SIZE = 4
2454
2455 reads = txt.read(4)
2456 pos = txt.tell()
2457 txt.seek(0)
2458 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002459 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002460
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002461 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002462 buffer = self.BytesIO(self.testdata)
2463 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002464
2465 self.assertEqual(buffer.seekable(), txt.seekable())
2466
Antoine Pitroue4501852009-05-14 18:55:55 +00002467 def test_append_bom(self):
2468 # The BOM is not written again when appending to a non-empty file
2469 filename = support.TESTFN
2470 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2471 with self.open(filename, 'w', encoding=charset) as f:
2472 f.write('aaa')
2473 pos = f.tell()
2474 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002476
2477 with self.open(filename, 'a', encoding=charset) as f:
2478 f.write('xxx')
2479 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002480 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002481
2482 def test_seek_bom(self):
2483 # Same test, but when seeking manually
2484 filename = support.TESTFN
2485 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2486 with self.open(filename, 'w', encoding=charset) as f:
2487 f.write('aaa')
2488 pos = f.tell()
2489 with self.open(filename, 'r+', encoding=charset) as f:
2490 f.seek(pos)
2491 f.write('zzz')
2492 f.seek(0)
2493 f.write('bbb')
2494 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002496
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002497 def test_errors_property(self):
2498 with self.open(support.TESTFN, "w") as f:
2499 self.assertEqual(f.errors, "strict")
2500 with self.open(support.TESTFN, "w", errors="replace") as f:
2501 self.assertEqual(f.errors, "replace")
2502
Brett Cannon31f59292011-02-21 19:29:56 +00002503 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002504 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002505 def test_threads_write(self):
2506 # Issue6750: concurrent writes could duplicate data
2507 event = threading.Event()
2508 with self.open(support.TESTFN, "w", buffering=1) as f:
2509 def run(n):
2510 text = "Thread%03d\n" % n
2511 event.wait()
2512 f.write(text)
2513 threads = [threading.Thread(target=lambda n=x: run(n))
2514 for x in range(20)]
2515 for t in threads:
2516 t.start()
2517 time.sleep(0.02)
2518 event.set()
2519 for t in threads:
2520 t.join()
2521 with self.open(support.TESTFN) as f:
2522 content = f.read()
2523 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002525
Antoine Pitrou6be88762010-05-03 16:48:20 +00002526 def test_flush_error_on_close(self):
2527 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2528 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002529 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002530 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002531 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002532 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002533
2534 def test_multi_close(self):
2535 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2536 txt.close()
2537 txt.close()
2538 txt.close()
2539 self.assertRaises(ValueError, txt.flush)
2540
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002541 def test_unseekable(self):
2542 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2543 self.assertRaises(self.UnsupportedOperation, txt.tell)
2544 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2545
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002546 def test_readonly_attributes(self):
2547 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2548 buf = self.BytesIO(self.testdata)
2549 with self.assertRaises(AttributeError):
2550 txt.buffer = buf
2551
Antoine Pitroue96ec682011-07-23 21:46:35 +02002552 def test_rawio(self):
2553 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2554 # that subprocess.Popen() can have the required unbuffered
2555 # semantics with universal_newlines=True.
2556 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2557 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2558 # Reads
2559 self.assertEqual(txt.read(4), 'abcd')
2560 self.assertEqual(txt.readline(), 'efghi\n')
2561 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2562
2563 def test_rawio_write_through(self):
2564 # Issue #12591: with write_through=True, writes don't need a flush
2565 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2566 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2567 write_through=True)
2568 txt.write('1')
2569 txt.write('23\n4')
2570 txt.write('5')
2571 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2572
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002573 def test_read_nonbytes(self):
2574 # Issue #17106
2575 # Crash when underlying read() returns non-bytes
2576 t = self.TextIOWrapper(self.StringIO('a'))
2577 self.assertRaises(TypeError, t.read, 1)
2578 t = self.TextIOWrapper(self.StringIO('a'))
2579 self.assertRaises(TypeError, t.readline)
2580 t = self.TextIOWrapper(self.StringIO('a'))
2581 self.assertRaises(TypeError, t.read)
2582
2583 def test_illegal_decoder(self):
2584 # Issue #17106
2585 # Crash when decoder returns non-string
2586 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2587 encoding='quopri_codec')
2588 self.assertRaises(TypeError, t.read, 1)
2589 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2590 encoding='quopri_codec')
2591 self.assertRaises(TypeError, t.readline)
2592 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2593 encoding='quopri_codec')
2594 self.assertRaises(TypeError, t.read)
2595
2596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002597class CTextIOWrapperTest(TextIOWrapperTest):
2598
2599 def test_initialization(self):
2600 r = self.BytesIO(b"\xc3\xa9\n\n")
2601 b = self.BufferedReader(r, 1000)
2602 t = self.TextIOWrapper(b)
2603 self.assertRaises(TypeError, t.__init__, b, newline=42)
2604 self.assertRaises(ValueError, t.read)
2605 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2606 self.assertRaises(ValueError, t.read)
2607
2608 def test_garbage_collection(self):
2609 # C TextIOWrapper objects are collected, and collecting them flushes
2610 # all data to disk.
2611 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002612 with support.check_warnings(('', ResourceWarning)):
2613 rawio = io.FileIO(support.TESTFN, "wb")
2614 b = self.BufferedWriter(rawio)
2615 t = self.TextIOWrapper(b, encoding="ascii")
2616 t.write("456def")
2617 t.x = t
2618 wr = weakref.ref(t)
2619 del t
2620 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002621 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002622 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002623 self.assertEqual(f.read(), b"456def")
2624
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002625 def test_rwpair_cleared_before_textio(self):
2626 # Issue 13070: TextIOWrapper's finalization would crash when called
2627 # after the reference to the underlying BufferedRWPair's writer got
2628 # cleared by the GC.
2629 for i in range(1000):
2630 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2631 t1 = self.TextIOWrapper(b1, encoding="ascii")
2632 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2633 t2 = self.TextIOWrapper(b2, encoding="ascii")
2634 # circular references
2635 t1.buddy = t2
2636 t2.buddy = t1
2637 support.gc_collect()
2638
2639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640class PyTextIOWrapperTest(TextIOWrapperTest):
2641 pass
2642
2643
2644class IncrementalNewlineDecoderTest(unittest.TestCase):
2645
2646 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002647 # UTF-8 specific tests for a newline decoder
2648 def _check_decode(b, s, **kwargs):
2649 # We exercise getstate() / setstate() as well as decode()
2650 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002651 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002652 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002653 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002654
Antoine Pitrou180a3362008-12-14 16:36:46 +00002655 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002656
Antoine Pitrou180a3362008-12-14 16:36:46 +00002657 _check_decode(b'\xe8', "")
2658 _check_decode(b'\xa2', "")
2659 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002660
Antoine Pitrou180a3362008-12-14 16:36:46 +00002661 _check_decode(b'\xe8', "")
2662 _check_decode(b'\xa2', "")
2663 _check_decode(b'\x88', "\u8888")
2664
2665 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002666 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2667
Antoine Pitrou180a3362008-12-14 16:36:46 +00002668 decoder.reset()
2669 _check_decode(b'\n', "\n")
2670 _check_decode(b'\r', "")
2671 _check_decode(b'', "\n", final=True)
2672 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002673
Antoine Pitrou180a3362008-12-14 16:36:46 +00002674 _check_decode(b'\r', "")
2675 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002676
Antoine Pitrou180a3362008-12-14 16:36:46 +00002677 _check_decode(b'\r\r\n', "\n\n")
2678 _check_decode(b'\r', "")
2679 _check_decode(b'\r', "\n")
2680 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002681
Antoine Pitrou180a3362008-12-14 16:36:46 +00002682 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2683 _check_decode(b'\xe8\xa2\x88', "\u8888")
2684 _check_decode(b'\n', "\n")
2685 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2686 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002689 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002690 if encoding is not None:
2691 encoder = codecs.getincrementalencoder(encoding)()
2692 def _decode_bytewise(s):
2693 # Decode one byte at a time
2694 for b in encoder.encode(s):
2695 result.append(decoder.decode(bytes([b])))
2696 else:
2697 encoder = None
2698 def _decode_bytewise(s):
2699 # Decode one char at a time
2700 for c in s:
2701 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002702 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002703 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002704 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002705 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002706 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002707 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002708 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002709 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002710 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002711 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002712 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002713 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 input = "abc"
2715 if encoder is not None:
2716 encoder.reset()
2717 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002718 self.assertEqual(decoder.decode(input), "abc")
2719 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002720
2721 def test_newline_decoder(self):
2722 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 # None meaning the IncrementalNewlineDecoder takes unicode input
2724 # rather than bytes input
2725 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002726 'utf-16', 'utf-16-le', 'utf-16-be',
2727 'utf-32', 'utf-32-le', 'utf-32-be',
2728 )
2729 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730 decoder = enc and codecs.getincrementaldecoder(enc)()
2731 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2732 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002733 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002734 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2735 self.check_newline_decoding_utf8(decoder)
2736
Antoine Pitrou66913e22009-03-06 23:40:56 +00002737 def test_newline_bytes(self):
2738 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2739 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002740 self.assertEqual(dec.newlines, None)
2741 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2742 self.assertEqual(dec.newlines, None)
2743 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2744 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002745 dec = self.IncrementalNewlineDecoder(None, translate=False)
2746 _check(dec)
2747 dec = self.IncrementalNewlineDecoder(None, translate=True)
2748 _check(dec)
2749
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2751 pass
2752
2753class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2754 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002755
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002756
Guido van Rossum01a27522007-03-07 01:00:12 +00002757# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002758
Guido van Rossum5abbf752007-08-27 17:39:33 +00002759class MiscIOTest(unittest.TestCase):
2760
Barry Warsaw40e82462008-11-20 20:14:50 +00002761 def tearDown(self):
2762 support.unlink(support.TESTFN)
2763
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 def test___all__(self):
2765 for name in self.io.__all__:
2766 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002767 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002768 if name == "open":
2769 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002770 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002771 self.assertTrue(issubclass(obj, Exception), name)
2772 elif not name.startswith("SEEK_"):
2773 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002774
Barry Warsaw40e82462008-11-20 20:14:50 +00002775 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002777 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002778 f.close()
2779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002780 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002781 self.assertEqual(f.name, support.TESTFN)
2782 self.assertEqual(f.buffer.name, support.TESTFN)
2783 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2784 self.assertEqual(f.mode, "U")
2785 self.assertEqual(f.buffer.mode, "rb")
2786 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002787 f.close()
2788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002789 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(f.mode, "w+")
2791 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2792 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(g.mode, "wb")
2796 self.assertEqual(g.raw.mode, "wb")
2797 self.assertEqual(g.name, f.fileno())
2798 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002799 f.close()
2800 g.close()
2801
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002802 def test_io_after_close(self):
2803 for kwargs in [
2804 {"mode": "w"},
2805 {"mode": "wb"},
2806 {"mode": "w", "buffering": 1},
2807 {"mode": "w", "buffering": 2},
2808 {"mode": "wb", "buffering": 0},
2809 {"mode": "r"},
2810 {"mode": "rb"},
2811 {"mode": "r", "buffering": 1},
2812 {"mode": "r", "buffering": 2},
2813 {"mode": "rb", "buffering": 0},
2814 {"mode": "w+"},
2815 {"mode": "w+b"},
2816 {"mode": "w+", "buffering": 1},
2817 {"mode": "w+", "buffering": 2},
2818 {"mode": "w+b", "buffering": 0},
2819 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002820 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002821 f.close()
2822 self.assertRaises(ValueError, f.flush)
2823 self.assertRaises(ValueError, f.fileno)
2824 self.assertRaises(ValueError, f.isatty)
2825 self.assertRaises(ValueError, f.__iter__)
2826 if hasattr(f, "peek"):
2827 self.assertRaises(ValueError, f.peek, 1)
2828 self.assertRaises(ValueError, f.read)
2829 if hasattr(f, "read1"):
2830 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002831 if hasattr(f, "readall"):
2832 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002833 if hasattr(f, "readinto"):
2834 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2835 self.assertRaises(ValueError, f.readline)
2836 self.assertRaises(ValueError, f.readlines)
2837 self.assertRaises(ValueError, f.seek, 0)
2838 self.assertRaises(ValueError, f.tell)
2839 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 self.assertRaises(ValueError, f.write,
2841 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002842 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002843 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 def test_blockingioerror(self):
2846 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 class C(str):
2848 pass
2849 c = C("")
2850 b = self.BlockingIOError(1, c)
2851 c.b = b
2852 b.c = c
2853 wr = weakref.ref(c)
2854 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002855 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002856 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002857
2858 def test_abcs(self):
2859 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002860 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2861 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2862 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2863 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002864
2865 def _check_abc_inheritance(self, abcmodule):
2866 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002867 self.assertIsInstance(f, abcmodule.IOBase)
2868 self.assertIsInstance(f, abcmodule.RawIOBase)
2869 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2870 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002871 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002872 self.assertIsInstance(f, abcmodule.IOBase)
2873 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2874 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2875 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002876 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002877 self.assertIsInstance(f, abcmodule.IOBase)
2878 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2879 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2880 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002881
2882 def test_abc_inheritance(self):
2883 # Test implementations inherit from their respective ABCs
2884 self._check_abc_inheritance(self)
2885
2886 def test_abc_inheritance_official(self):
2887 # Test implementations inherit from the official ABCs of the
2888 # baseline "io" module.
2889 self._check_abc_inheritance(io)
2890
Antoine Pitroue033e062010-10-29 10:38:18 +00002891 def _check_warn_on_dealloc(self, *args, **kwargs):
2892 f = open(*args, **kwargs)
2893 r = repr(f)
2894 with self.assertWarns(ResourceWarning) as cm:
2895 f = None
2896 support.gc_collect()
2897 self.assertIn(r, str(cm.warning.args[0]))
2898
2899 def test_warn_on_dealloc(self):
2900 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2901 self._check_warn_on_dealloc(support.TESTFN, "wb")
2902 self._check_warn_on_dealloc(support.TESTFN, "w")
2903
2904 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2905 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002906 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002907 for fd in fds:
2908 try:
2909 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002910 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002911 if e.errno != errno.EBADF:
2912 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002913 self.addCleanup(cleanup_fds)
2914 r, w = os.pipe()
2915 fds += r, w
2916 self._check_warn_on_dealloc(r, *args, **kwargs)
2917 # When using closefd=False, there's no warning
2918 r, w = os.pipe()
2919 fds += r, w
2920 with warnings.catch_warnings(record=True) as recorded:
2921 open(r, *args, closefd=False, **kwargs)
2922 support.gc_collect()
2923 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002924
2925 def test_warn_on_dealloc_fd(self):
2926 self._check_warn_on_dealloc_fd("rb", buffering=0)
2927 self._check_warn_on_dealloc_fd("rb")
2928 self._check_warn_on_dealloc_fd("r")
2929
2930
Antoine Pitrou243757e2010-11-05 21:15:39 +00002931 def test_pickling(self):
2932 # Pickling file objects is forbidden
2933 for kwargs in [
2934 {"mode": "w"},
2935 {"mode": "wb"},
2936 {"mode": "wb", "buffering": 0},
2937 {"mode": "r"},
2938 {"mode": "rb"},
2939 {"mode": "rb", "buffering": 0},
2940 {"mode": "w+"},
2941 {"mode": "w+b"},
2942 {"mode": "w+b", "buffering": 0},
2943 ]:
2944 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2945 with self.open(support.TESTFN, **kwargs) as f:
2946 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2947
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002948 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2949 def test_nonblock_pipe_write_bigbuf(self):
2950 self._test_nonblock_pipe_write(16*1024)
2951
2952 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2953 def test_nonblock_pipe_write_smallbuf(self):
2954 self._test_nonblock_pipe_write(1024)
2955
2956 def _set_non_blocking(self, fd):
2957 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2958 self.assertNotEqual(flags, -1)
2959 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2960 self.assertEqual(res, 0)
2961
2962 def _test_nonblock_pipe_write(self, bufsize):
2963 sent = []
2964 received = []
2965 r, w = os.pipe()
2966 self._set_non_blocking(r)
2967 self._set_non_blocking(w)
2968
2969 # To exercise all code paths in the C implementation we need
2970 # to play with buffer sizes. For instance, if we choose a
2971 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2972 # then we will never get a partial write of the buffer.
2973 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2974 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2975
2976 with rf, wf:
2977 for N in 9999, 73, 7574:
2978 try:
2979 i = 0
2980 while True:
2981 msg = bytes([i % 26 + 97]) * N
2982 sent.append(msg)
2983 wf.write(msg)
2984 i += 1
2985
2986 except self.BlockingIOError as e:
2987 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002988 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002989 sent[-1] = sent[-1][:e.characters_written]
2990 received.append(rf.read())
2991 msg = b'BLOCKED'
2992 wf.write(msg)
2993 sent.append(msg)
2994
2995 while True:
2996 try:
2997 wf.flush()
2998 break
2999 except self.BlockingIOError as e:
3000 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003001 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003002 self.assertEqual(e.characters_written, 0)
3003 received.append(rf.read())
3004
3005 received += iter(rf.read, None)
3006
3007 sent, received = b''.join(sent), b''.join(received)
3008 self.assertTrue(sent == received)
3009 self.assertTrue(wf.closed)
3010 self.assertTrue(rf.closed)
3011
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003012 def test_create_fail(self):
3013 # 'x' mode fails if file is existing
3014 with self.open(support.TESTFN, 'w'):
3015 pass
3016 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3017
3018 def test_create_writes(self):
3019 # 'x' mode opens for writing
3020 with self.open(support.TESTFN, 'xb') as f:
3021 f.write(b"spam")
3022 with self.open(support.TESTFN, 'rb') as f:
3023 self.assertEqual(b"spam", f.read())
3024
Christian Heimes7b648752012-09-10 14:48:43 +02003025 def test_open_allargs(self):
3026 # there used to be a buffer overflow in the parser for rawmode
3027 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3028
3029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003030class CMiscIOTest(MiscIOTest):
3031 io = io
3032
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003033 def test_readinto_buffer_overflow(self):
3034 # Issue #18025
3035 class BadReader(self.io.BufferedIOBase):
3036 def read(self, n=-1):
3037 return b'x' * 10**6
3038 bufio = BadReader()
3039 b = bytearray(2)
3040 self.assertRaises(ValueError, bufio.readinto, b)
3041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042class PyMiscIOTest(MiscIOTest):
3043 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003044
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003045
3046@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3047class SignalsTest(unittest.TestCase):
3048
3049 def setUp(self):
3050 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3051
3052 def tearDown(self):
3053 signal.signal(signal.SIGALRM, self.oldalrm)
3054
3055 def alarm_interrupt(self, sig, frame):
3056 1/0
3057
3058 @unittest.skipUnless(threading, 'Threading required for this test.')
3059 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3060 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003061 invokes the signal handler, and bubbles up the exception raised
3062 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003063 read_results = []
3064 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003065 if hasattr(signal, 'pthread_sigmask'):
3066 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003067 s = os.read(r, 1)
3068 read_results.append(s)
3069 t = threading.Thread(target=_read)
3070 t.daemon = True
3071 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003072 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003073 try:
3074 wio = self.io.open(w, **fdopen_kwargs)
3075 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003076 # Fill the pipe enough that the write will be blocking.
3077 # It will be interrupted by the timer armed above. Since the
3078 # other thread has read one byte, the low-level write will
3079 # return with a successful (partial) result rather than an EINTR.
3080 # The buffered IO layer must check for pending signal
3081 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003082 signal.alarm(1)
3083 try:
3084 self.assertRaises(ZeroDivisionError,
3085 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3086 finally:
3087 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003088 t.join()
3089 # We got one byte, get another one and check that it isn't a
3090 # repeat of the first one.
3091 read_results.append(os.read(r, 1))
3092 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3093 finally:
3094 os.close(w)
3095 os.close(r)
3096 # This is deliberate. If we didn't close the file descriptor
3097 # before closing wio, wio would try to flush its internal
3098 # buffer, and block again.
3099 try:
3100 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003101 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003102 if e.errno != errno.EBADF:
3103 raise
3104
3105 def test_interrupted_write_unbuffered(self):
3106 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3107
3108 def test_interrupted_write_buffered(self):
3109 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3110
3111 def test_interrupted_write_text(self):
3112 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3113
Brett Cannon31f59292011-02-21 19:29:56 +00003114 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003115 def check_reentrant_write(self, data, **fdopen_kwargs):
3116 def on_alarm(*args):
3117 # Will be called reentrantly from the same thread
3118 wio.write(data)
3119 1/0
3120 signal.signal(signal.SIGALRM, on_alarm)
3121 r, w = os.pipe()
3122 wio = self.io.open(w, **fdopen_kwargs)
3123 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003124 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003125 # Either the reentrant call to wio.write() fails with RuntimeError,
3126 # or the signal handler raises ZeroDivisionError.
3127 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3128 while 1:
3129 for i in range(100):
3130 wio.write(data)
3131 wio.flush()
3132 # Make sure the buffer doesn't fill up and block further writes
3133 os.read(r, len(data) * 100)
3134 exc = cm.exception
3135 if isinstance(exc, RuntimeError):
3136 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3137 finally:
3138 wio.close()
3139 os.close(r)
3140
3141 def test_reentrant_write_buffered(self):
3142 self.check_reentrant_write(b"xy", mode="wb")
3143
3144 def test_reentrant_write_text(self):
3145 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3146
Antoine Pitrou707ce822011-02-25 21:24:11 +00003147 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3148 """Check that a buffered read, when it gets interrupted (either
3149 returning a partial result or EINTR), properly invokes the signal
3150 handler and retries if the latter returned successfully."""
3151 r, w = os.pipe()
3152 fdopen_kwargs["closefd"] = False
3153 def alarm_handler(sig, frame):
3154 os.write(w, b"bar")
3155 signal.signal(signal.SIGALRM, alarm_handler)
3156 try:
3157 rio = self.io.open(r, **fdopen_kwargs)
3158 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003159 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003160 # Expected behaviour:
3161 # - first raw read() returns partial b"foo"
3162 # - second raw read() returns EINTR
3163 # - third raw read() returns b"bar"
3164 self.assertEqual(decode(rio.read(6)), "foobar")
3165 finally:
3166 rio.close()
3167 os.close(w)
3168 os.close(r)
3169
Antoine Pitrou20db5112011-08-19 20:32:34 +02003170 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003171 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3172 mode="rb")
3173
Antoine Pitrou20db5112011-08-19 20:32:34 +02003174 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003175 self.check_interrupted_read_retry(lambda x: x,
3176 mode="r")
3177
3178 @unittest.skipUnless(threading, 'Threading required for this test.')
3179 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3180 """Check that a buffered write, when it gets interrupted (either
3181 returning a partial result or EINTR), properly invokes the signal
3182 handler and retries if the latter returned successfully."""
3183 select = support.import_module("select")
3184 # A quantity that exceeds the buffer size of an anonymous pipe's
3185 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003186 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003187 r, w = os.pipe()
3188 fdopen_kwargs["closefd"] = False
3189 # We need a separate thread to read from the pipe and allow the
3190 # write() to finish. This thread is started after the SIGALRM is
3191 # received (forcing a first EINTR in write()).
3192 read_results = []
3193 write_finished = False
3194 def _read():
3195 while not write_finished:
3196 while r in select.select([r], [], [], 1.0)[0]:
3197 s = os.read(r, 1024)
3198 read_results.append(s)
3199 t = threading.Thread(target=_read)
3200 t.daemon = True
3201 def alarm1(sig, frame):
3202 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003203 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003204 def alarm2(sig, frame):
3205 t.start()
3206 signal.signal(signal.SIGALRM, alarm1)
3207 try:
3208 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003209 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003210 # Expected behaviour:
3211 # - first raw write() is partial (because of the limited pipe buffer
3212 # and the first alarm)
3213 # - second raw write() returns EINTR (because of the second alarm)
3214 # - subsequent write()s are successful (either partial or complete)
3215 self.assertEqual(N, wio.write(item * N))
3216 wio.flush()
3217 write_finished = True
3218 t.join()
3219 self.assertEqual(N, sum(len(x) for x in read_results))
3220 finally:
3221 write_finished = True
3222 os.close(w)
3223 os.close(r)
3224 # This is deliberate. If we didn't close the file descriptor
3225 # before closing wio, wio would try to flush its internal
3226 # buffer, and could block (in case of failure).
3227 try:
3228 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003229 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003230 if e.errno != errno.EBADF:
3231 raise
3232
Antoine Pitrou20db5112011-08-19 20:32:34 +02003233 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003234 self.check_interrupted_write_retry(b"x", mode="wb")
3235
Antoine Pitrou20db5112011-08-19 20:32:34 +02003236 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003237 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3238
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003239
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003240class CSignalsTest(SignalsTest):
3241 io = io
3242
3243class PySignalsTest(SignalsTest):
3244 io = pyio
3245
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003246 # Handling reentrancy issues would slow down _pyio even more, so the
3247 # tests are disabled.
3248 test_reentrant_write_buffered = None
3249 test_reentrant_write_text = None
3250
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003251
Ezio Melottidaa42c72013-03-23 16:30:16 +02003252def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003253 tests = (CIOTest, PyIOTest,
3254 CBufferedReaderTest, PyBufferedReaderTest,
3255 CBufferedWriterTest, PyBufferedWriterTest,
3256 CBufferedRWPairTest, PyBufferedRWPairTest,
3257 CBufferedRandomTest, PyBufferedRandomTest,
3258 StatefulIncrementalDecoderTest,
3259 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3260 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003261 CMiscIOTest, PyMiscIOTest,
3262 CSignalsTest, PySignalsTest,
3263 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003264
3265 # Put the namespaces of the IO module we are testing and some useful mock
3266 # classes in the __dict__ of each test.
3267 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003268 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003269 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3270 c_io_ns = {name : getattr(io, name) for name in all_members}
3271 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3272 globs = globals()
3273 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3274 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3275 # Avoid turning open into a bound method.
3276 py_io_ns["open"] = pyio.OpenWrapper
3277 for test in tests:
3278 if test.__name__.startswith("C"):
3279 for name, obj in c_io_ns.items():
3280 setattr(test, name, obj)
3281 elif test.__name__.startswith("Py"):
3282 for name, obj in py_io_ns.items():
3283 setattr(test, name, obj)
3284
Ezio Melottidaa42c72013-03-23 16:30:16 +02003285 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3286 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003287
3288if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003289 unittest.main()