blob: 8ef314e55d20ea3073197af75d4616a19f349fb0 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020034import _testcapi
Georg Brandl1b37e872010-03-14 10:45:50 +000035from itertools import cycle, count
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
53 with open(__file__, "r", encoding="latin1") as f:
54 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
606
607 def test_multi_close(self):
608 f = self.open(support.TESTFN, "wb", buffering=0)
609 f.close()
610 f.close()
611 f.close()
612 self.assertRaises(ValueError, f.flush)
613
Antoine Pitrou328ec742010-09-14 18:37:24 +0000614 def test_RawIOBase_read(self):
615 # Exercise the default RawIOBase.read() implementation (which calls
616 # readinto() internally).
617 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
618 self.assertEqual(rawio.read(2), b"ab")
619 self.assertEqual(rawio.read(2), b"c")
620 self.assertEqual(rawio.read(2), b"d")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"ef")
623 self.assertEqual(rawio.read(2), b"g")
624 self.assertEqual(rawio.read(2), None)
625 self.assertEqual(rawio.read(2), b"")
626
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400627 def test_types_have_dict(self):
628 test = (
629 self.IOBase(),
630 self.RawIOBase(),
631 self.TextIOBase(),
632 self.StringIO(),
633 self.BytesIO()
634 )
635 for obj in test:
636 self.assertTrue(hasattr(obj, "__dict__"))
637
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200638 def test_fileio_closefd(self):
639 # Issue #4841
640 with self.open(__file__, 'rb') as f1, \
641 self.open(__file__, 'rb') as f2:
642 fileio = self.FileIO(f1.fileno(), closefd=False)
643 # .__init__() must not close f1
644 fileio.__init__(f2.fileno(), closefd=False)
645 f1.readline()
646 # .close() must not close f2
647 fileio.close()
648 f2.readline()
649
650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200652
653 def test_IOBase_finalize(self):
654 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
655 # class which inherits IOBase and an object of this class are caught
656 # in a reference cycle and close() is already in the method cache.
657 class MyIO(self.IOBase):
658 def close(self):
659 pass
660
661 # create an instance to populate the method cache
662 MyIO()
663 obj = MyIO()
664 obj.obj = obj
665 wr = weakref.ref(obj)
666 del MyIO
667 del obj
668 support.gc_collect()
669 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class PyIOTest(IOTest):
672 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000673
Guido van Rossuma9e20242007-03-08 00:43:48 +0000674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675class CommonBufferedTests:
676 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
677
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000678 def test_detach(self):
679 raw = self.MockRawIO()
680 buf = self.tp(raw)
681 self.assertIs(buf.detach(), raw)
682 self.assertRaises(ValueError, buf.detach)
683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_fileno(self):
685 rawio = self.MockRawIO()
686 bufio = self.tp(rawio)
687
Ezio Melottib3aedd42010-11-20 19:04:17 +0000688 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689
690 def test_no_fileno(self):
691 # XXX will we always have fileno() function? If so, kill
692 # this test. Else, write it.
693 pass
694
695 def test_invalid_args(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698 # Invalid whence
699 self.assertRaises(ValueError, bufio.seek, 0, -1)
700 self.assertRaises(ValueError, bufio.seek, 0, 3)
701
702 def test_override_destructor(self):
703 tp = self.tp
704 record = []
705 class MyBufferedIO(tp):
706 def __del__(self):
707 record.append(1)
708 try:
709 f = super().__del__
710 except AttributeError:
711 pass
712 else:
713 f()
714 def close(self):
715 record.append(2)
716 super().close()
717 def flush(self):
718 record.append(3)
719 super().flush()
720 rawio = self.MockRawIO()
721 bufio = MyBufferedIO(rawio)
722 writable = bufio.writable()
723 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000724 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 if writable:
726 self.assertEqual(record, [1, 2, 3])
727 else:
728 self.assertEqual(record, [1, 2])
729
730 def test_context_manager(self):
731 # Test usability as a context manager
732 rawio = self.MockRawIO()
733 bufio = self.tp(rawio)
734 def _with():
735 with bufio:
736 pass
737 _with()
738 # bufio should now be closed, and using it a second time should raise
739 # a ValueError.
740 self.assertRaises(ValueError, _with)
741
742 def test_error_through_destructor(self):
743 # Test that the exception state is not modified by a destructor,
744 # even if close() fails.
745 rawio = self.CloseFailureIO()
746 def f():
747 self.tp(rawio).xyzzy
748 with support.captured_output("stderr") as s:
749 self.assertRaises(AttributeError, f)
750 s = s.getvalue().strip()
751 if s:
752 # The destructor *may* have printed an unraisable error, check it
753 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000754 self.assertTrue(s.startswith("Exception IOError: "), s)
755 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000756
Antoine Pitrou716c4442009-05-23 19:04:03 +0000757 def test_repr(self):
758 raw = self.MockRawIO()
759 b = self.tp(raw)
760 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
761 self.assertEqual(repr(b), "<%s>" % clsname)
762 raw.name = "dummy"
763 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
764 raw.name = b"dummy"
765 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
766
Antoine Pitrou6be88762010-05-03 16:48:20 +0000767 def test_flush_error_on_close(self):
768 raw = self.MockRawIO()
769 def bad_flush():
770 raise IOError()
771 raw.flush = bad_flush
772 b = self.tp(raw)
773 self.assertRaises(IOError, b.close) # exception not swallowed
774
775 def test_multi_close(self):
776 raw = self.MockRawIO()
777 b = self.tp(raw)
778 b.close()
779 b.close()
780 b.close()
781 self.assertRaises(ValueError, b.flush)
782
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000783 def test_unseekable(self):
784 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
785 self.assertRaises(self.UnsupportedOperation, bufio.tell)
786 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
787
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000788 def test_readonly_attributes(self):
789 raw = self.MockRawIO()
790 buf = self.tp(raw)
791 x = self.MockRawIO()
792 with self.assertRaises(AttributeError):
793 buf.raw = x
794
Guido van Rossum78892e42007-04-06 17:31:18 +0000795
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200796class SizeofTest:
797
798 @support.cpython_only
799 def test_sizeof(self):
800 bufsize1 = 4096
801 bufsize2 = 8192
802 rawio = self.MockRawIO()
803 bufio = self.tp(rawio, buffer_size=bufsize1)
804 size = sys.getsizeof(bufio) - bufsize1
805 rawio = self.MockRawIO()
806 bufio = self.tp(rawio, buffer_size=bufsize2)
807 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
808
809
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000810class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
811 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000813 def test_constructor(self):
814 rawio = self.MockRawIO([b"abc"])
815 bufio = self.tp(rawio)
816 bufio.__init__(rawio)
817 bufio.__init__(rawio, buffer_size=1024)
818 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000819 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
822 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
823 rawio = self.MockRawIO([b"abc"])
824 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000825 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000828 for arg in (None, 7):
829 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
830 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000831 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832 # Invalid args
833 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835 def test_read1(self):
836 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
837 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000838 self.assertEqual(b"a", bufio.read(1))
839 self.assertEqual(b"b", bufio.read1(1))
840 self.assertEqual(rawio._reads, 1)
841 self.assertEqual(b"c", bufio.read1(100))
842 self.assertEqual(rawio._reads, 1)
843 self.assertEqual(b"d", bufio.read1(100))
844 self.assertEqual(rawio._reads, 2)
845 self.assertEqual(b"efg", bufio.read1(100))
846 self.assertEqual(rawio._reads, 3)
847 self.assertEqual(b"", bufio.read1(100))
848 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 # Invalid args
850 self.assertRaises(ValueError, bufio.read1, -1)
851
852 def test_readinto(self):
853 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
854 bufio = self.tp(rawio)
855 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(bufio.readinto(b), 2)
857 self.assertEqual(b, b"ab")
858 self.assertEqual(bufio.readinto(b), 2)
859 self.assertEqual(b, b"cd")
860 self.assertEqual(bufio.readinto(b), 2)
861 self.assertEqual(b, b"ef")
862 self.assertEqual(bufio.readinto(b), 1)
863 self.assertEqual(b, b"gf")
864 self.assertEqual(bufio.readinto(b), 0)
865 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000867 def test_readlines(self):
868 def bufio():
869 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
870 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000871 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
872 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
873 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000876 data = b"abcdefghi"
877 dlen = len(data)
878
879 tests = [
880 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
881 [ 100, [ 3, 3, 3], [ dlen ] ],
882 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
883 ]
884
885 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 rawio = self.MockFileIO(data)
887 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000888 pos = 0
889 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000891 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000893 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000896 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
898 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000899 self.assertEqual(b"abcd", bufio.read(6))
900 self.assertEqual(b"e", bufio.read(1))
901 self.assertEqual(b"fg", bufio.read())
902 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200903 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Victor Stinnera80987f2011-05-25 22:47:16 +0200906 rawio = self.MockRawIO((b"a", None, None))
907 self.assertEqual(b"a", rawio.readall())
908 self.assertIsNone(rawio.readall())
909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_read_past_eof(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000913
Ezio Melottib3aedd42010-11-20 19:04:17 +0000914 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_read_all(self):
917 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
918 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000919
Ezio Melottib3aedd42010-11-20 19:04:17 +0000920 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000921
Victor Stinner45df8202010-04-28 22:31:17 +0000922 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000923 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000925 try:
926 # Write out many bytes with exactly the same number of 0's,
927 # 1's... 255's. This will help us check that concurrent reading
928 # doesn't duplicate or forget contents.
929 N = 1000
930 l = list(range(256)) * N
931 random.shuffle(l)
932 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000933 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000934 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000935 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000937 errors = []
938 results = []
939 def f():
940 try:
941 # Intra-buffer read then buffer-flushing read
942 for n in cycle([1, 19]):
943 s = bufio.read(n)
944 if not s:
945 break
946 # list.append() is atomic
947 results.append(s)
948 except Exception as e:
949 errors.append(e)
950 raise
951 threads = [threading.Thread(target=f) for x in range(20)]
952 for t in threads:
953 t.start()
954 time.sleep(0.02) # yield
955 for t in threads:
956 t.join()
957 self.assertFalse(errors,
958 "the following exceptions were caught: %r" % errors)
959 s = b''.join(results)
960 for i in range(256):
961 c = bytes(bytearray([i]))
962 self.assertEqual(s.count(c), N)
963 finally:
964 support.unlink(support.TESTFN)
965
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200966 def test_unseekable(self):
967 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
968 self.assertRaises(self.UnsupportedOperation, bufio.tell)
969 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
970 bufio.read(1)
971 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
972 self.assertRaises(self.UnsupportedOperation, bufio.tell)
973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_misbehaved_io(self):
975 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
976 bufio = self.tp(rawio)
977 self.assertRaises(IOError, bufio.seek, 0)
978 self.assertRaises(IOError, bufio.tell)
979
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000980 def test_no_extraneous_read(self):
981 # Issue #9550; when the raw IO object has satisfied the read request,
982 # we should not issue any additional reads, otherwise it may block
983 # (e.g. socket).
984 bufsize = 16
985 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
986 rawio = self.MockRawIO([b"x" * n])
987 bufio = self.tp(rawio, bufsize)
988 self.assertEqual(bufio.read(n), b"x" * n)
989 # Simple case: one raw read is enough to satisfy the request.
990 self.assertEqual(rawio._extraneous_reads, 0,
991 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
992 # A more complex case where two raw reads are needed to satisfy
993 # the request.
994 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
995 bufio = self.tp(rawio, bufsize)
996 self.assertEqual(bufio.read(n), b"x" * n)
997 self.assertEqual(rawio._extraneous_reads, 0,
998 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
999
1000
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001001class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 tp = io.BufferedReader
1003
1004 def test_constructor(self):
1005 BufferedReaderTest.test_constructor(self)
1006 # The allocation can succeed on 32-bit builds, e.g. with more
1007 # than 2GB RAM and a 64-bit kernel.
1008 if sys.maxsize > 0x7FFFFFFF:
1009 rawio = self.MockRawIO()
1010 bufio = self.tp(rawio)
1011 self.assertRaises((OverflowError, MemoryError, ValueError),
1012 bufio.__init__, rawio, sys.maxsize)
1013
1014 def test_initialization(self):
1015 rawio = self.MockRawIO([b"abc"])
1016 bufio = self.tp(rawio)
1017 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1018 self.assertRaises(ValueError, bufio.read)
1019 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1020 self.assertRaises(ValueError, bufio.read)
1021 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1022 self.assertRaises(ValueError, bufio.read)
1023
1024 def test_misbehaved_io_read(self):
1025 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1026 bufio = self.tp(rawio)
1027 # _pyio.BufferedReader seems to implement reading different, so that
1028 # checking this is not so easy.
1029 self.assertRaises(IOError, bufio.read, 10)
1030
1031 def test_garbage_collection(self):
1032 # C BufferedReader objects are collected.
1033 # The Python version has __del__, so it ends into gc.garbage instead
1034 rawio = self.FileIO(support.TESTFN, "w+b")
1035 f = self.tp(rawio)
1036 f.f = f
1037 wr = weakref.ref(f)
1038 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001039 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001040 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041
R David Murray67bfe802013-02-23 21:51:05 -05001042 def test_args_error(self):
1043 # Issue #17275
1044 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1045 self.tp(io.BytesIO(), 1024, 1024, 1024)
1046
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048class PyBufferedReaderTest(BufferedReaderTest):
1049 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001050
Guido van Rossuma9e20242007-03-08 00:43:48 +00001051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1053 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 def test_constructor(self):
1056 rawio = self.MockRawIO()
1057 bufio = self.tp(rawio)
1058 bufio.__init__(rawio)
1059 bufio.__init__(rawio, buffer_size=1024)
1060 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001061 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 bufio.flush()
1063 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1064 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1065 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1066 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001067 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001069 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001071 def test_detach_flush(self):
1072 raw = self.MockRawIO()
1073 buf = self.tp(raw)
1074 buf.write(b"howdy!")
1075 self.assertFalse(raw._write_stack)
1076 buf.detach()
1077 self.assertEqual(raw._write_stack, [b"howdy!"])
1078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001080 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 writer = self.MockRawIO()
1082 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001083 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001084 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086 def test_write_overflow(self):
1087 writer = self.MockRawIO()
1088 bufio = self.tp(writer, 8)
1089 contents = b"abcdefghijklmnop"
1090 for n in range(0, len(contents), 3):
1091 bufio.write(contents[n:n+3])
1092 flushed = b"".join(writer._write_stack)
1093 # At least (total - 8) bytes were implicitly flushed, perhaps more
1094 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001095 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 def check_writes(self, intermediate_func):
1098 # Lots of writes, test the flushed output is as expected.
1099 contents = bytes(range(256)) * 1000
1100 n = 0
1101 writer = self.MockRawIO()
1102 bufio = self.tp(writer, 13)
1103 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1104 def gen_sizes():
1105 for size in count(1):
1106 for i in range(15):
1107 yield size
1108 sizes = gen_sizes()
1109 while n < len(contents):
1110 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001111 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 intermediate_func(bufio)
1113 n += size
1114 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001115 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 def test_writes(self):
1118 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 def test_writes_and_flushes(self):
1121 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 def test_writes_and_seeks(self):
1124 def _seekabs(bufio):
1125 pos = bufio.tell()
1126 bufio.seek(pos + 1, 0)
1127 bufio.seek(pos - 1, 0)
1128 bufio.seek(pos, 0)
1129 self.check_writes(_seekabs)
1130 def _seekrel(bufio):
1131 pos = bufio.seek(0, 1)
1132 bufio.seek(+1, 1)
1133 bufio.seek(-1, 1)
1134 bufio.seek(pos, 0)
1135 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 def test_writes_and_truncates(self):
1138 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_write_non_blocking(self):
1141 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001142 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001143
Ezio Melottib3aedd42010-11-20 19:04:17 +00001144 self.assertEqual(bufio.write(b"abcd"), 4)
1145 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 # 1 byte will be written, the rest will be buffered
1147 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001148 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1151 raw.block_on(b"0")
1152 try:
1153 bufio.write(b"opqrwxyz0123456789")
1154 except self.BlockingIOError as e:
1155 written = e.characters_written
1156 else:
1157 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001158 self.assertEqual(written, 16)
1159 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Ezio Melottib3aedd42010-11-20 19:04:17 +00001162 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163 s = raw.pop_written()
1164 # Previously buffered bytes were flushed
1165 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 def test_write_and_rewind(self):
1168 raw = io.BytesIO()
1169 bufio = self.tp(raw, 4)
1170 self.assertEqual(bufio.write(b"abcdef"), 6)
1171 self.assertEqual(bufio.tell(), 6)
1172 bufio.seek(0, 0)
1173 self.assertEqual(bufio.write(b"XY"), 2)
1174 bufio.seek(6, 0)
1175 self.assertEqual(raw.getvalue(), b"XYcdef")
1176 self.assertEqual(bufio.write(b"123456"), 6)
1177 bufio.flush()
1178 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180 def test_flush(self):
1181 writer = self.MockRawIO()
1182 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001183 bufio.write(b"abc")
1184 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001185 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001186
Antoine Pitrou131a4892012-10-16 22:57:11 +02001187 def test_writelines(self):
1188 l = [b'ab', b'cd', b'ef']
1189 writer = self.MockRawIO()
1190 bufio = self.tp(writer, 8)
1191 bufio.writelines(l)
1192 bufio.flush()
1193 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1194
1195 def test_writelines_userlist(self):
1196 l = UserList([b'ab', b'cd', b'ef'])
1197 writer = self.MockRawIO()
1198 bufio = self.tp(writer, 8)
1199 bufio.writelines(l)
1200 bufio.flush()
1201 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1202
1203 def test_writelines_error(self):
1204 writer = self.MockRawIO()
1205 bufio = self.tp(writer, 8)
1206 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1207 self.assertRaises(TypeError, bufio.writelines, None)
1208 self.assertRaises(TypeError, bufio.writelines, 'abc')
1209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 def test_destructor(self):
1211 writer = self.MockRawIO()
1212 bufio = self.tp(writer, 8)
1213 bufio.write(b"abc")
1214 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001215 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001216 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001217
1218 def test_truncate(self):
1219 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001220 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 bufio = self.tp(raw, 8)
1222 bufio.write(b"abcdef")
1223 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001224 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001225 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 self.assertEqual(f.read(), b"abc")
1227
Victor Stinner45df8202010-04-28 22:31:17 +00001228 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001229 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001231 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 # Write out many bytes from many threads and test they were
1233 # all flushed.
1234 N = 1000
1235 contents = bytes(range(256)) * N
1236 sizes = cycle([1, 19])
1237 n = 0
1238 queue = deque()
1239 while n < len(contents):
1240 size = next(sizes)
1241 queue.append(contents[n:n+size])
1242 n += size
1243 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001244 # We use a real file object because it allows us to
1245 # exercise situations where the GIL is released before
1246 # writing the buffer to the raw streams. This is in addition
1247 # to concurrency issues due to switching threads in the middle
1248 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001249 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001251 errors = []
1252 def f():
1253 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 while True:
1255 try:
1256 s = queue.popleft()
1257 except IndexError:
1258 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001259 bufio.write(s)
1260 except Exception as e:
1261 errors.append(e)
1262 raise
1263 threads = [threading.Thread(target=f) for x in range(20)]
1264 for t in threads:
1265 t.start()
1266 time.sleep(0.02) # yield
1267 for t in threads:
1268 t.join()
1269 self.assertFalse(errors,
1270 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001272 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 s = f.read()
1274 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001276 finally:
1277 support.unlink(support.TESTFN)
1278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 def test_misbehaved_io(self):
1280 rawio = self.MisbehavedRawIO()
1281 bufio = self.tp(rawio, 5)
1282 self.assertRaises(IOError, bufio.seek, 0)
1283 self.assertRaises(IOError, bufio.tell)
1284 self.assertRaises(IOError, bufio.write, b"abcdef")
1285
Benjamin Peterson59406a92009-03-26 17:10:29 +00001286 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001287 with support.check_warnings(("max_buffer_size is deprecated",
1288 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001289 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001290
1291
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001292class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 tp = io.BufferedWriter
1294
1295 def test_constructor(self):
1296 BufferedWriterTest.test_constructor(self)
1297 # The allocation can succeed on 32-bit builds, e.g. with more
1298 # than 2GB RAM and a 64-bit kernel.
1299 if sys.maxsize > 0x7FFFFFFF:
1300 rawio = self.MockRawIO()
1301 bufio = self.tp(rawio)
1302 self.assertRaises((OverflowError, MemoryError, ValueError),
1303 bufio.__init__, rawio, sys.maxsize)
1304
1305 def test_initialization(self):
1306 rawio = self.MockRawIO()
1307 bufio = self.tp(rawio)
1308 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1309 self.assertRaises(ValueError, bufio.write, b"def")
1310 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1311 self.assertRaises(ValueError, bufio.write, b"def")
1312 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1313 self.assertRaises(ValueError, bufio.write, b"def")
1314
1315 def test_garbage_collection(self):
1316 # C BufferedWriter objects are collected, and collecting them flushes
1317 # all data to disk.
1318 # The Python version has __del__, so it ends into gc.garbage instead
1319 rawio = self.FileIO(support.TESTFN, "w+b")
1320 f = self.tp(rawio)
1321 f.write(b"123xxx")
1322 f.x = f
1323 wr = weakref.ref(f)
1324 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001325 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001326 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001327 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 self.assertEqual(f.read(), b"123xxx")
1329
R David Murray67bfe802013-02-23 21:51:05 -05001330 def test_args_error(self):
1331 # Issue #17275
1332 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1333 self.tp(io.BytesIO(), 1024, 1024, 1024)
1334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001335
1336class PyBufferedWriterTest(BufferedWriterTest):
1337 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001338
Guido van Rossum01a27522007-03-07 01:00:12 +00001339class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001340
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001341 def test_constructor(self):
1342 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001343 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001344
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001345 def test_detach(self):
1346 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1347 self.assertRaises(self.UnsupportedOperation, pair.detach)
1348
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001349 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001350 with support.check_warnings(("max_buffer_size is deprecated",
1351 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001352 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001353
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001354 def test_constructor_with_not_readable(self):
1355 class NotReadable(MockRawIO):
1356 def readable(self):
1357 return False
1358
1359 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1360
1361 def test_constructor_with_not_writeable(self):
1362 class NotWriteable(MockRawIO):
1363 def writable(self):
1364 return False
1365
1366 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1367
1368 def test_read(self):
1369 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1370
1371 self.assertEqual(pair.read(3), b"abc")
1372 self.assertEqual(pair.read(1), b"d")
1373 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001374 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1375 self.assertEqual(pair.read(None), b"abc")
1376
1377 def test_readlines(self):
1378 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1379 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1380 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1381 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001382
1383 def test_read1(self):
1384 # .read1() is delegated to the underlying reader object, so this test
1385 # can be shallow.
1386 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1387
1388 self.assertEqual(pair.read1(3), b"abc")
1389
1390 def test_readinto(self):
1391 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1392
1393 data = bytearray(5)
1394 self.assertEqual(pair.readinto(data), 5)
1395 self.assertEqual(data, b"abcde")
1396
1397 def test_write(self):
1398 w = self.MockRawIO()
1399 pair = self.tp(self.MockRawIO(), w)
1400
1401 pair.write(b"abc")
1402 pair.flush()
1403 pair.write(b"def")
1404 pair.flush()
1405 self.assertEqual(w._write_stack, [b"abc", b"def"])
1406
1407 def test_peek(self):
1408 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1409
1410 self.assertTrue(pair.peek(3).startswith(b"abc"))
1411 self.assertEqual(pair.read(3), b"abc")
1412
1413 def test_readable(self):
1414 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1415 self.assertTrue(pair.readable())
1416
1417 def test_writeable(self):
1418 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1419 self.assertTrue(pair.writable())
1420
1421 def test_seekable(self):
1422 # BufferedRWPairs are never seekable, even if their readers and writers
1423 # are.
1424 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1425 self.assertFalse(pair.seekable())
1426
1427 # .flush() is delegated to the underlying writer object and has been
1428 # tested in the test_write method.
1429
1430 def test_close_and_closed(self):
1431 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1432 self.assertFalse(pair.closed)
1433 pair.close()
1434 self.assertTrue(pair.closed)
1435
1436 def test_isatty(self):
1437 class SelectableIsAtty(MockRawIO):
1438 def __init__(self, isatty):
1439 MockRawIO.__init__(self)
1440 self._isatty = isatty
1441
1442 def isatty(self):
1443 return self._isatty
1444
1445 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1446 self.assertFalse(pair.isatty())
1447
1448 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1449 self.assertTrue(pair.isatty())
1450
1451 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1452 self.assertTrue(pair.isatty())
1453
1454 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1455 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457class CBufferedRWPairTest(BufferedRWPairTest):
1458 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460class PyBufferedRWPairTest(BufferedRWPairTest):
1461 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463
1464class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1465 read_mode = "rb+"
1466 write_mode = "wb+"
1467
1468 def test_constructor(self):
1469 BufferedReaderTest.test_constructor(self)
1470 BufferedWriterTest.test_constructor(self)
1471
1472 def test_read_and_write(self):
1473 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001474 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001475
1476 self.assertEqual(b"as", rw.read(2))
1477 rw.write(b"ddd")
1478 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001479 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001480 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001482
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001483 def test_seek_and_tell(self):
1484 raw = self.BytesIO(b"asdfghjkl")
1485 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001486
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(b"as", rw.read(2))
1488 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001489 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001490 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001491
Antoine Pitroue05565e2011-08-20 14:39:23 +02001492 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001493 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001494 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001495 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001496 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001497 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001498 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001499 self.assertEqual(7, rw.tell())
1500 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001501 rw.flush()
1502 self.assertEqual(b"asdf123fl", raw.getvalue())
1503
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001504 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001506 def check_flush_and_read(self, read_func):
1507 raw = self.BytesIO(b"abcdefghi")
1508 bufio = self.tp(raw)
1509
Ezio Melottib3aedd42010-11-20 19:04:17 +00001510 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001512 self.assertEqual(b"ef", read_func(bufio, 2))
1513 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001514 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001515 self.assertEqual(6, bufio.tell())
1516 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 raw.seek(0, 0)
1518 raw.write(b"XYZ")
1519 # flush() resets the read buffer
1520 bufio.flush()
1521 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001522 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523
1524 def test_flush_and_read(self):
1525 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1526
1527 def test_flush_and_readinto(self):
1528 def _readinto(bufio, n=-1):
1529 b = bytearray(n if n >= 0 else 9999)
1530 n = bufio.readinto(b)
1531 return bytes(b[:n])
1532 self.check_flush_and_read(_readinto)
1533
1534 def test_flush_and_peek(self):
1535 def _peek(bufio, n=-1):
1536 # This relies on the fact that the buffer can contain the whole
1537 # raw stream, otherwise peek() can return less.
1538 b = bufio.peek(n)
1539 if n != -1:
1540 b = b[:n]
1541 bufio.seek(len(b), 1)
1542 return b
1543 self.check_flush_and_read(_peek)
1544
1545 def test_flush_and_write(self):
1546 raw = self.BytesIO(b"abcdefghi")
1547 bufio = self.tp(raw)
1548
1549 bufio.write(b"123")
1550 bufio.flush()
1551 bufio.write(b"45")
1552 bufio.flush()
1553 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001554 self.assertEqual(b"12345fghi", raw.getvalue())
1555 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556
1557 def test_threads(self):
1558 BufferedReaderTest.test_threads(self)
1559 BufferedWriterTest.test_threads(self)
1560
1561 def test_writes_and_peek(self):
1562 def _peek(bufio):
1563 bufio.peek(1)
1564 self.check_writes(_peek)
1565 def _peek(bufio):
1566 pos = bufio.tell()
1567 bufio.seek(-1, 1)
1568 bufio.peek(1)
1569 bufio.seek(pos, 0)
1570 self.check_writes(_peek)
1571
1572 def test_writes_and_reads(self):
1573 def _read(bufio):
1574 bufio.seek(-1, 1)
1575 bufio.read(1)
1576 self.check_writes(_read)
1577
1578 def test_writes_and_read1s(self):
1579 def _read1(bufio):
1580 bufio.seek(-1, 1)
1581 bufio.read1(1)
1582 self.check_writes(_read1)
1583
1584 def test_writes_and_readintos(self):
1585 def _read(bufio):
1586 bufio.seek(-1, 1)
1587 bufio.readinto(bytearray(1))
1588 self.check_writes(_read)
1589
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001590 def test_write_after_readahead(self):
1591 # Issue #6629: writing after the buffer was filled by readahead should
1592 # first rewind the raw stream.
1593 for overwrite_size in [1, 5]:
1594 raw = self.BytesIO(b"A" * 10)
1595 bufio = self.tp(raw, 4)
1596 # Trigger readahead
1597 self.assertEqual(bufio.read(1), b"A")
1598 self.assertEqual(bufio.tell(), 1)
1599 # Overwriting should rewind the raw stream if it needs so
1600 bufio.write(b"B" * overwrite_size)
1601 self.assertEqual(bufio.tell(), overwrite_size + 1)
1602 # If the write size was smaller than the buffer size, flush() and
1603 # check that rewind happens.
1604 bufio.flush()
1605 self.assertEqual(bufio.tell(), overwrite_size + 1)
1606 s = raw.getvalue()
1607 self.assertEqual(s,
1608 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1609
Antoine Pitrou7c404892011-05-13 00:13:33 +02001610 def test_write_rewind_write(self):
1611 # Various combinations of reading / writing / seeking backwards / writing again
1612 def mutate(bufio, pos1, pos2):
1613 assert pos2 >= pos1
1614 # Fill the buffer
1615 bufio.seek(pos1)
1616 bufio.read(pos2 - pos1)
1617 bufio.write(b'\x02')
1618 # This writes earlier than the previous write, but still inside
1619 # the buffer.
1620 bufio.seek(pos1)
1621 bufio.write(b'\x01')
1622
1623 b = b"\x80\x81\x82\x83\x84"
1624 for i in range(0, len(b)):
1625 for j in range(i, len(b)):
1626 raw = self.BytesIO(b)
1627 bufio = self.tp(raw, 100)
1628 mutate(bufio, i, j)
1629 bufio.flush()
1630 expected = bytearray(b)
1631 expected[j] = 2
1632 expected[i] = 1
1633 self.assertEqual(raw.getvalue(), expected,
1634 "failed result for i=%d, j=%d" % (i, j))
1635
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001636 def test_truncate_after_read_or_write(self):
1637 raw = self.BytesIO(b"A" * 10)
1638 bufio = self.tp(raw, 100)
1639 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1640 self.assertEqual(bufio.truncate(), 2)
1641 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1642 self.assertEqual(bufio.truncate(), 4)
1643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 def test_misbehaved_io(self):
1645 BufferedReaderTest.test_misbehaved_io(self)
1646 BufferedWriterTest.test_misbehaved_io(self)
1647
Antoine Pitroue05565e2011-08-20 14:39:23 +02001648 def test_interleaved_read_write(self):
1649 # Test for issue #12213
1650 with self.BytesIO(b'abcdefgh') as raw:
1651 with self.tp(raw, 100) as f:
1652 f.write(b"1")
1653 self.assertEqual(f.read(1), b'b')
1654 f.write(b'2')
1655 self.assertEqual(f.read1(1), b'd')
1656 f.write(b'3')
1657 buf = bytearray(1)
1658 f.readinto(buf)
1659 self.assertEqual(buf, b'f')
1660 f.write(b'4')
1661 self.assertEqual(f.peek(1), b'h')
1662 f.flush()
1663 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1664
1665 with self.BytesIO(b'abc') as raw:
1666 with self.tp(raw, 100) as f:
1667 self.assertEqual(f.read(1), b'a')
1668 f.write(b"2")
1669 self.assertEqual(f.read(1), b'c')
1670 f.flush()
1671 self.assertEqual(raw.getvalue(), b'a2c')
1672
1673 def test_interleaved_readline_write(self):
1674 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1675 with self.tp(raw) as f:
1676 f.write(b'1')
1677 self.assertEqual(f.readline(), b'b\n')
1678 f.write(b'2')
1679 self.assertEqual(f.readline(), b'def\n')
1680 f.write(b'3')
1681 self.assertEqual(f.readline(), b'\n')
1682 f.flush()
1683 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1684
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001685 # You can't construct a BufferedRandom over a non-seekable stream.
1686 test_unseekable = None
1687
R David Murray67bfe802013-02-23 21:51:05 -05001688
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001689class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 tp = io.BufferedRandom
1691
1692 def test_constructor(self):
1693 BufferedRandomTest.test_constructor(self)
1694 # The allocation can succeed on 32-bit builds, e.g. with more
1695 # than 2GB RAM and a 64-bit kernel.
1696 if sys.maxsize > 0x7FFFFFFF:
1697 rawio = self.MockRawIO()
1698 bufio = self.tp(rawio)
1699 self.assertRaises((OverflowError, MemoryError, ValueError),
1700 bufio.__init__, rawio, sys.maxsize)
1701
1702 def test_garbage_collection(self):
1703 CBufferedReaderTest.test_garbage_collection(self)
1704 CBufferedWriterTest.test_garbage_collection(self)
1705
R David Murray67bfe802013-02-23 21:51:05 -05001706 def test_args_error(self):
1707 # Issue #17275
1708 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1709 self.tp(io.BytesIO(), 1024, 1024, 1024)
1710
1711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712class PyBufferedRandomTest(BufferedRandomTest):
1713 tp = pyio.BufferedRandom
1714
1715
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001716# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1717# properties:
1718# - A single output character can correspond to many bytes of input.
1719# - The number of input bytes to complete the character can be
1720# undetermined until the last input byte is received.
1721# - The number of input bytes can vary depending on previous input.
1722# - A single input byte can correspond to many characters of output.
1723# - The number of output characters can be undetermined until the
1724# last input byte is received.
1725# - The number of output characters can vary depending on previous input.
1726
1727class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1728 """
1729 For testing seek/tell behavior with a stateful, buffering decoder.
1730
1731 Input is a sequence of words. Words may be fixed-length (length set
1732 by input) or variable-length (period-terminated). In variable-length
1733 mode, extra periods are ignored. Possible words are:
1734 - 'i' followed by a number sets the input length, I (maximum 99).
1735 When I is set to 0, words are space-terminated.
1736 - 'o' followed by a number sets the output length, O (maximum 99).
1737 - Any other word is converted into a word followed by a period on
1738 the output. The output word consists of the input word truncated
1739 or padded out with hyphens to make its length equal to O. If O
1740 is 0, the word is output verbatim without truncating or padding.
1741 I and O are initially set to 1. When I changes, any buffered input is
1742 re-scanned according to the new I. EOF also terminates the last word.
1743 """
1744
1745 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001746 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001747 self.reset()
1748
1749 def __repr__(self):
1750 return '<SID %x>' % id(self)
1751
1752 def reset(self):
1753 self.i = 1
1754 self.o = 1
1755 self.buffer = bytearray()
1756
1757 def getstate(self):
1758 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1759 return bytes(self.buffer), i*100 + o
1760
1761 def setstate(self, state):
1762 buffer, io = state
1763 self.buffer = bytearray(buffer)
1764 i, o = divmod(io, 100)
1765 self.i, self.o = i ^ 1, o ^ 1
1766
1767 def decode(self, input, final=False):
1768 output = ''
1769 for b in input:
1770 if self.i == 0: # variable-length, terminated with period
1771 if b == ord('.'):
1772 if self.buffer:
1773 output += self.process_word()
1774 else:
1775 self.buffer.append(b)
1776 else: # fixed-length, terminate after self.i bytes
1777 self.buffer.append(b)
1778 if len(self.buffer) == self.i:
1779 output += self.process_word()
1780 if final and self.buffer: # EOF terminates the last word
1781 output += self.process_word()
1782 return output
1783
1784 def process_word(self):
1785 output = ''
1786 if self.buffer[0] == ord('i'):
1787 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1788 elif self.buffer[0] == ord('o'):
1789 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1790 else:
1791 output = self.buffer.decode('ascii')
1792 if len(output) < self.o:
1793 output += '-'*self.o # pad out with hyphens
1794 if self.o:
1795 output = output[:self.o] # truncate to output length
1796 output += '.'
1797 self.buffer = bytearray()
1798 return output
1799
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001800 codecEnabled = False
1801
1802 @classmethod
1803 def lookupTestDecoder(cls, name):
1804 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001805 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001806 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001807 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001808 incrementalencoder=None,
1809 streamreader=None, streamwriter=None,
1810 incrementaldecoder=cls)
1811
1812# Register the previous decoder for testing.
1813# Disabled by default, tests will enable it.
1814codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1815
1816
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001817class StatefulIncrementalDecoderTest(unittest.TestCase):
1818 """
1819 Make sure the StatefulIncrementalDecoder actually works.
1820 """
1821
1822 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001823 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001824 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001825 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001826 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001827 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001828 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001829 # I=0, O=6 (variable-length input, fixed-length output)
1830 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1831 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001832 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001833 # I=6, O=3 (fixed-length input > fixed-length output)
1834 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1835 # I=0, then 3; O=29, then 15 (with longer output)
1836 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1837 'a----------------------------.' +
1838 'b----------------------------.' +
1839 'cde--------------------------.' +
1840 'abcdefghijabcde.' +
1841 'a.b------------.' +
1842 '.c.------------.' +
1843 'd.e------------.' +
1844 'k--------------.' +
1845 'l--------------.' +
1846 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001847 ]
1848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001850 # Try a few one-shot test cases.
1851 for input, eof, output in self.test_cases:
1852 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001853 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001854
1855 # Also test an unfinished decode, followed by forcing EOF.
1856 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001857 self.assertEqual(d.decode(b'oiabcd'), '')
1858 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001859
1860class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001861
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001862 def setUp(self):
1863 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1864 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001865 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001866
Guido van Rossumd0712812007-04-11 16:32:43 +00001867 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001868 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 def test_constructor(self):
1871 r = self.BytesIO(b"\xc3\xa9\n\n")
1872 b = self.BufferedReader(r, 1000)
1873 t = self.TextIOWrapper(b)
1874 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001875 self.assertEqual(t.encoding, "latin1")
1876 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001878 self.assertEqual(t.encoding, "utf8")
1879 self.assertEqual(t.line_buffering, True)
1880 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 self.assertRaises(TypeError, t.__init__, b, newline=42)
1882 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1883
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001884 def test_detach(self):
1885 r = self.BytesIO()
1886 b = self.BufferedWriter(r)
1887 t = self.TextIOWrapper(b)
1888 self.assertIs(t.detach(), b)
1889
1890 t = self.TextIOWrapper(b, encoding="ascii")
1891 t.write("howdy")
1892 self.assertFalse(r.getvalue())
1893 t.detach()
1894 self.assertEqual(r.getvalue(), b"howdy")
1895 self.assertRaises(ValueError, t.detach)
1896
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001897 def test_repr(self):
1898 raw = self.BytesIO("hello".encode("utf-8"))
1899 b = self.BufferedReader(raw)
1900 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001901 modname = self.TextIOWrapper.__module__
1902 self.assertEqual(repr(t),
1903 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1904 raw.name = "dummy"
1905 self.assertEqual(repr(t),
1906 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001907 t.mode = "r"
1908 self.assertEqual(repr(t),
1909 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001910 raw.name = b"dummy"
1911 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001912 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 def test_line_buffering(self):
1915 r = self.BytesIO()
1916 b = self.BufferedWriter(r, 1000)
1917 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001918 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001919 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001920 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001921 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001922 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001923 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001924
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001925 # Issue 15989
1926 def test_device_encoding(self):
1927 b = self.BytesIO()
1928 b.fileno = lambda: _testcapi.INT_MAX + 1
1929 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1930 b.fileno = lambda: _testcapi.UINT_MAX + 1
1931 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 def test_encoding(self):
1934 # Check the encoding attribute is always set, and valid
1935 b = self.BytesIO()
1936 t = self.TextIOWrapper(b, encoding="utf8")
1937 self.assertEqual(t.encoding, "utf8")
1938 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001939 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940 codecs.lookup(t.encoding)
1941
1942 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001943 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 b = self.BytesIO(b"abc\n\xff\n")
1945 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001946 self.assertRaises(UnicodeError, t.read)
1947 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 b = self.BytesIO(b"abc\n\xff\n")
1949 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001950 self.assertRaises(UnicodeError, t.read)
1951 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 b = self.BytesIO(b"abc\n\xff\n")
1953 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001954 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001955 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 b = self.BytesIO(b"abc\n\xff\n")
1957 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001958 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001961 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 b = self.BytesIO()
1963 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001964 self.assertRaises(UnicodeError, t.write, "\xff")
1965 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 b = self.BytesIO()
1967 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001968 self.assertRaises(UnicodeError, t.write, "\xff")
1969 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 b = self.BytesIO()
1971 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001972 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001973 t.write("abc\xffdef\n")
1974 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001975 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001976 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 b = self.BytesIO()
1978 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001979 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001980 t.write("abc\xffdef\n")
1981 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001982 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001985 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1986
1987 tests = [
1988 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001989 [ '', input_lines ],
1990 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1991 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1992 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001993 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001994 encodings = (
1995 'utf-8', 'latin-1',
1996 'utf-16', 'utf-16-le', 'utf-16-be',
1997 'utf-32', 'utf-32-le', 'utf-32-be',
1998 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001999
Guido van Rossum8358db22007-08-18 21:39:55 +00002000 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002001 # character in TextIOWrapper._pending_line.
2002 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002003 # XXX: str.encode() should return bytes
2004 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002005 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002006 for bufsize in range(1, 10):
2007 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2009 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002010 encoding=encoding)
2011 if do_reads:
2012 got_lines = []
2013 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002014 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002015 if c2 == '':
2016 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002018 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002019 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002020 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002021
2022 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual(got_line, exp_line)
2024 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 def test_newlines_input(self):
2027 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002028 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2029 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002030 (None, normalized.decode("ascii").splitlines(True)),
2031 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2033 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2034 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002035 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 buf = self.BytesIO(testdata)
2037 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002038 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002039 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002040 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 def test_newlines_output(self):
2043 testdict = {
2044 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2045 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2046 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2047 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2048 }
2049 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2050 for newline, expected in tests:
2051 buf = self.BytesIO()
2052 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2053 txt.write("AAA\nB")
2054 txt.write("BB\nCCC\n")
2055 txt.write("X\rY\r\nZ")
2056 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(buf.closed, False)
2058 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059
2060 def test_destructor(self):
2061 l = []
2062 base = self.BytesIO
2063 class MyBytesIO(base):
2064 def close(self):
2065 l.append(self.getvalue())
2066 base.close(self)
2067 b = MyBytesIO()
2068 t = self.TextIOWrapper(b, encoding="ascii")
2069 t.write("abc")
2070 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002071 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073
2074 def test_override_destructor(self):
2075 record = []
2076 class MyTextIO(self.TextIOWrapper):
2077 def __del__(self):
2078 record.append(1)
2079 try:
2080 f = super().__del__
2081 except AttributeError:
2082 pass
2083 else:
2084 f()
2085 def close(self):
2086 record.append(2)
2087 super().close()
2088 def flush(self):
2089 record.append(3)
2090 super().flush()
2091 b = self.BytesIO()
2092 t = MyTextIO(b, encoding="ascii")
2093 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002094 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 self.assertEqual(record, [1, 2, 3])
2096
2097 def test_error_through_destructor(self):
2098 # Test that the exception state is not modified by a destructor,
2099 # even if close() fails.
2100 rawio = self.CloseFailureIO()
2101 def f():
2102 self.TextIOWrapper(rawio).xyzzy
2103 with support.captured_output("stderr") as s:
2104 self.assertRaises(AttributeError, f)
2105 s = s.getvalue().strip()
2106 if s:
2107 # The destructor *may* have printed an unraisable error, check it
2108 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002109 self.assertTrue(s.startswith("Exception IOError: "), s)
2110 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002111
Guido van Rossum9b76da62007-04-11 01:09:03 +00002112 # Systematic tests of the text I/O API
2113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002115 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2116 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002118 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002119 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002120 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002122 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(f.tell(), 0)
2124 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002125 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(f.seek(0), 0)
2127 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002128 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002129 self.assertEqual(f.read(2), "ab")
2130 self.assertEqual(f.read(1), "c")
2131 self.assertEqual(f.read(1), "")
2132 self.assertEqual(f.read(), "")
2133 self.assertEqual(f.tell(), cookie)
2134 self.assertEqual(f.seek(0), 0)
2135 self.assertEqual(f.seek(0, 2), cookie)
2136 self.assertEqual(f.write("def"), 3)
2137 self.assertEqual(f.seek(cookie), cookie)
2138 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002139 if enc.startswith("utf"):
2140 self.multi_line_test(f, enc)
2141 f.close()
2142
2143 def multi_line_test(self, f, enc):
2144 f.seek(0)
2145 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002146 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002147 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002148 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002149 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002150 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002151 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002152 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002153 wlines.append((f.tell(), line))
2154 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002155 f.seek(0)
2156 rlines = []
2157 while True:
2158 pos = f.tell()
2159 line = f.readline()
2160 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002161 break
2162 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_telling(self):
2166 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002167 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002168 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002169 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002170 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002171 p2 = f.tell()
2172 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002173 self.assertEqual(f.tell(), p0)
2174 self.assertEqual(f.readline(), "\xff\n")
2175 self.assertEqual(f.tell(), p1)
2176 self.assertEqual(f.readline(), "\xff\n")
2177 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002178 f.seek(0)
2179 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002180 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002181 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002183 f.close()
2184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 def test_seeking(self):
2186 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002187 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002188 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002189 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002191 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002192 suffix = bytes(u_suffix.encode("utf-8"))
2193 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002194 with self.open(support.TESTFN, "wb") as f:
2195 f.write(line*2)
2196 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2197 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(s, str(prefix, "ascii"))
2199 self.assertEqual(f.tell(), prefix_size)
2200 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002201
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002202 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002203 # Regression test for a specific bug
2204 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002205 with self.open(support.TESTFN, "wb") as f:
2206 f.write(data)
2207 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2208 f._CHUNK_SIZE # Just test that it exists
2209 f._CHUNK_SIZE = 2
2210 f.readline()
2211 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002213 def test_seek_and_tell(self):
2214 #Test seek/tell using the StatefulIncrementalDecoder.
2215 # Make test faster by doing smaller seeks
2216 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002217
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002218 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002219 """Tell/seek to various points within a data stream and ensure
2220 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002222 f.write(data)
2223 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 f = self.open(support.TESTFN, encoding='test_decoder')
2225 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002226 decoded = f.read()
2227 f.close()
2228
Neal Norwitze2b07052008-03-18 19:52:05 +00002229 for i in range(min_pos, len(decoded) + 1): # seek positions
2230 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002232 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002233 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002234 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002235 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002237 f.close()
2238
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002239 # Enable the test decoder.
2240 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002241
2242 # Run the tests.
2243 try:
2244 # Try each test case.
2245 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002246 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002247
2248 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002249 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2250 offset = CHUNK_SIZE - len(input)//2
2251 prefix = b'.'*offset
2252 # Don't bother seeking into the prefix (takes too long).
2253 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002254 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002255
2256 # Ensure our test decoder won't interfere with subsequent tests.
2257 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002258 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002261 data = "1234567890"
2262 tests = ("utf-16",
2263 "utf-16-le",
2264 "utf-16-be",
2265 "utf-32",
2266 "utf-32-le",
2267 "utf-32-be")
2268 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 buf = self.BytesIO()
2270 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002271 # Check if the BOM is written only once (see issue1753).
2272 f.write(data)
2273 f.write(data)
2274 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002276 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(f.read(), data * 2)
2278 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002279
Benjamin Petersona1b49012009-03-31 23:11:32 +00002280 def test_unreadable(self):
2281 class UnReadable(self.BytesIO):
2282 def readable(self):
2283 return False
2284 txt = self.TextIOWrapper(UnReadable())
2285 self.assertRaises(IOError, txt.read)
2286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 def test_read_one_by_one(self):
2288 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289 reads = ""
2290 while True:
2291 c = txt.read(1)
2292 if not c:
2293 break
2294 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002296
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002297 def test_readlines(self):
2298 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2299 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2300 txt.seek(0)
2301 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2302 txt.seek(0)
2303 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2304
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002305 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002306 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002307 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002308 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002309 reads = ""
2310 while True:
2311 c = txt.read(128)
2312 if not c:
2313 break
2314 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002315 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002316
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002317 def test_writelines(self):
2318 l = ['ab', 'cd', 'ef']
2319 buf = self.BytesIO()
2320 txt = self.TextIOWrapper(buf)
2321 txt.writelines(l)
2322 txt.flush()
2323 self.assertEqual(buf.getvalue(), b'abcdef')
2324
2325 def test_writelines_userlist(self):
2326 l = UserList(['ab', 'cd', 'ef'])
2327 buf = self.BytesIO()
2328 txt = self.TextIOWrapper(buf)
2329 txt.writelines(l)
2330 txt.flush()
2331 self.assertEqual(buf.getvalue(), b'abcdef')
2332
2333 def test_writelines_error(self):
2334 txt = self.TextIOWrapper(self.BytesIO())
2335 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2336 self.assertRaises(TypeError, txt.writelines, None)
2337 self.assertRaises(TypeError, txt.writelines, b'abc')
2338
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002339 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002340 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002341
2342 # read one char at a time
2343 reads = ""
2344 while True:
2345 c = txt.read(1)
2346 if not c:
2347 break
2348 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002349 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350
2351 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002352 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002353 txt._CHUNK_SIZE = 4
2354
2355 reads = ""
2356 while True:
2357 c = txt.read(4)
2358 if not c:
2359 break
2360 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002361 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002362
2363 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002365 txt._CHUNK_SIZE = 4
2366
2367 reads = txt.read(4)
2368 reads += txt.read(4)
2369 reads += txt.readline()
2370 reads += txt.readline()
2371 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002372 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002373
2374 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002376 txt._CHUNK_SIZE = 4
2377
2378 reads = txt.read(4)
2379 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002380 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002381
2382 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002384 txt._CHUNK_SIZE = 4
2385
2386 reads = txt.read(4)
2387 pos = txt.tell()
2388 txt.seek(0)
2389 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002391
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002392 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393 buffer = self.BytesIO(self.testdata)
2394 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002395
2396 self.assertEqual(buffer.seekable(), txt.seekable())
2397
Antoine Pitroue4501852009-05-14 18:55:55 +00002398 def test_append_bom(self):
2399 # The BOM is not written again when appending to a non-empty file
2400 filename = support.TESTFN
2401 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2402 with self.open(filename, 'w', encoding=charset) as f:
2403 f.write('aaa')
2404 pos = f.tell()
2405 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002406 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002407
2408 with self.open(filename, 'a', encoding=charset) as f:
2409 f.write('xxx')
2410 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002411 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002412
2413 def test_seek_bom(self):
2414 # Same test, but when seeking manually
2415 filename = support.TESTFN
2416 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2417 with self.open(filename, 'w', encoding=charset) as f:
2418 f.write('aaa')
2419 pos = f.tell()
2420 with self.open(filename, 'r+', encoding=charset) as f:
2421 f.seek(pos)
2422 f.write('zzz')
2423 f.seek(0)
2424 f.write('bbb')
2425 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002426 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002427
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002428 def test_errors_property(self):
2429 with self.open(support.TESTFN, "w") as f:
2430 self.assertEqual(f.errors, "strict")
2431 with self.open(support.TESTFN, "w", errors="replace") as f:
2432 self.assertEqual(f.errors, "replace")
2433
Victor Stinner45df8202010-04-28 22:31:17 +00002434 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002435 def test_threads_write(self):
2436 # Issue6750: concurrent writes could duplicate data
2437 event = threading.Event()
2438 with self.open(support.TESTFN, "w", buffering=1) as f:
2439 def run(n):
2440 text = "Thread%03d\n" % n
2441 event.wait()
2442 f.write(text)
2443 threads = [threading.Thread(target=lambda n=x: run(n))
2444 for x in range(20)]
2445 for t in threads:
2446 t.start()
2447 time.sleep(0.02)
2448 event.set()
2449 for t in threads:
2450 t.join()
2451 with self.open(support.TESTFN) as f:
2452 content = f.read()
2453 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002454 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002455
Antoine Pitrou6be88762010-05-03 16:48:20 +00002456 def test_flush_error_on_close(self):
2457 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2458 def bad_flush():
2459 raise IOError()
2460 txt.flush = bad_flush
2461 self.assertRaises(IOError, txt.close) # exception not swallowed
2462
2463 def test_multi_close(self):
2464 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2465 txt.close()
2466 txt.close()
2467 txt.close()
2468 self.assertRaises(ValueError, txt.flush)
2469
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002470 def test_unseekable(self):
2471 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2472 self.assertRaises(self.UnsupportedOperation, txt.tell)
2473 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2474
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002475 def test_readonly_attributes(self):
2476 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2477 buf = self.BytesIO(self.testdata)
2478 with self.assertRaises(AttributeError):
2479 txt.buffer = buf
2480
Antoine Pitroue96ec682011-07-23 21:46:35 +02002481 def test_rawio(self):
2482 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2483 # that subprocess.Popen() can have the required unbuffered
2484 # semantics with universal_newlines=True.
2485 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2486 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2487 # Reads
2488 self.assertEqual(txt.read(4), 'abcd')
2489 self.assertEqual(txt.readline(), 'efghi\n')
2490 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2491
2492 def test_rawio_write_through(self):
2493 # Issue #12591: with write_through=True, writes don't need a flush
2494 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2495 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2496 write_through=True)
2497 txt.write('1')
2498 txt.write('23\n4')
2499 txt.write('5')
2500 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2501
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002502 def test_read_nonbytes(self):
2503 # Issue #17106
2504 # Crash when underlying read() returns non-bytes
2505 t = self.TextIOWrapper(self.StringIO('a'))
2506 self.assertRaises(TypeError, t.read, 1)
2507 t = self.TextIOWrapper(self.StringIO('a'))
2508 self.assertRaises(TypeError, t.readline)
2509 t = self.TextIOWrapper(self.StringIO('a'))
2510 self.assertRaises(TypeError, t.read)
2511
2512 def test_illegal_decoder(self):
2513 # Issue #17106
2514 # Crash when decoder returns non-string
2515 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2516 encoding='quopri_codec')
2517 self.assertRaises(TypeError, t.read, 1)
2518 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2519 encoding='quopri_codec')
2520 self.assertRaises(TypeError, t.readline)
2521 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2522 encoding='quopri_codec')
2523 self.assertRaises(TypeError, t.read)
2524
2525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526class CTextIOWrapperTest(TextIOWrapperTest):
2527
2528 def test_initialization(self):
2529 r = self.BytesIO(b"\xc3\xa9\n\n")
2530 b = self.BufferedReader(r, 1000)
2531 t = self.TextIOWrapper(b)
2532 self.assertRaises(TypeError, t.__init__, b, newline=42)
2533 self.assertRaises(ValueError, t.read)
2534 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2535 self.assertRaises(ValueError, t.read)
2536
2537 def test_garbage_collection(self):
2538 # C TextIOWrapper objects are collected, and collecting them flushes
2539 # all data to disk.
2540 # The Python version has __del__, so it ends in gc.garbage instead.
2541 rawio = io.FileIO(support.TESTFN, "wb")
2542 b = self.BufferedWriter(rawio)
2543 t = self.TextIOWrapper(b, encoding="ascii")
2544 t.write("456def")
2545 t.x = t
2546 wr = weakref.ref(t)
2547 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002548 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002549 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002550 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002551 self.assertEqual(f.read(), b"456def")
2552
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002553 def test_rwpair_cleared_before_textio(self):
2554 # Issue 13070: TextIOWrapper's finalization would crash when called
2555 # after the reference to the underlying BufferedRWPair's writer got
2556 # cleared by the GC.
2557 for i in range(1000):
2558 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2559 t1 = self.TextIOWrapper(b1, encoding="ascii")
2560 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2561 t2 = self.TextIOWrapper(b2, encoding="ascii")
2562 # circular references
2563 t1.buddy = t2
2564 t2.buddy = t1
2565 support.gc_collect()
2566
2567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568class PyTextIOWrapperTest(TextIOWrapperTest):
2569 pass
2570
2571
2572class IncrementalNewlineDecoderTest(unittest.TestCase):
2573
2574 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002575 # UTF-8 specific tests for a newline decoder
2576 def _check_decode(b, s, **kwargs):
2577 # We exercise getstate() / setstate() as well as decode()
2578 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002579 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002580 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002581 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002582
Antoine Pitrou180a3362008-12-14 16:36:46 +00002583 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002584
Antoine Pitrou180a3362008-12-14 16:36:46 +00002585 _check_decode(b'\xe8', "")
2586 _check_decode(b'\xa2', "")
2587 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002588
Antoine Pitrou180a3362008-12-14 16:36:46 +00002589 _check_decode(b'\xe8', "")
2590 _check_decode(b'\xa2', "")
2591 _check_decode(b'\x88', "\u8888")
2592
2593 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2595
Antoine Pitrou180a3362008-12-14 16:36:46 +00002596 decoder.reset()
2597 _check_decode(b'\n', "\n")
2598 _check_decode(b'\r', "")
2599 _check_decode(b'', "\n", final=True)
2600 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002601
Antoine Pitrou180a3362008-12-14 16:36:46 +00002602 _check_decode(b'\r', "")
2603 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002604
Antoine Pitrou180a3362008-12-14 16:36:46 +00002605 _check_decode(b'\r\r\n', "\n\n")
2606 _check_decode(b'\r', "")
2607 _check_decode(b'\r', "\n")
2608 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002609
Antoine Pitrou180a3362008-12-14 16:36:46 +00002610 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2611 _check_decode(b'\xe8\xa2\x88', "\u8888")
2612 _check_decode(b'\n', "\n")
2613 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2614 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002617 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002618 if encoding is not None:
2619 encoder = codecs.getincrementalencoder(encoding)()
2620 def _decode_bytewise(s):
2621 # Decode one byte at a time
2622 for b in encoder.encode(s):
2623 result.append(decoder.decode(bytes([b])))
2624 else:
2625 encoder = None
2626 def _decode_bytewise(s):
2627 # Decode one char at a time
2628 for c in s:
2629 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002630 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002631 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002632 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002633 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002635 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002637 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002638 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002639 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002640 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002641 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642 input = "abc"
2643 if encoder is not None:
2644 encoder.reset()
2645 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(decoder.decode(input), "abc")
2647 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002648
2649 def test_newline_decoder(self):
2650 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 # None meaning the IncrementalNewlineDecoder takes unicode input
2652 # rather than bytes input
2653 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002654 'utf-16', 'utf-16-le', 'utf-16-be',
2655 'utf-32', 'utf-32-le', 'utf-32-be',
2656 )
2657 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 decoder = enc and codecs.getincrementaldecoder(enc)()
2659 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2660 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002661 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2663 self.check_newline_decoding_utf8(decoder)
2664
Antoine Pitrou66913e22009-03-06 23:40:56 +00002665 def test_newline_bytes(self):
2666 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2667 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002668 self.assertEqual(dec.newlines, None)
2669 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2670 self.assertEqual(dec.newlines, None)
2671 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2672 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002673 dec = self.IncrementalNewlineDecoder(None, translate=False)
2674 _check(dec)
2675 dec = self.IncrementalNewlineDecoder(None, translate=True)
2676 _check(dec)
2677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002678class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2679 pass
2680
2681class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2682 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002683
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002684
Guido van Rossum01a27522007-03-07 01:00:12 +00002685# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002686
Guido van Rossum5abbf752007-08-27 17:39:33 +00002687class MiscIOTest(unittest.TestCase):
2688
Barry Warsaw40e82462008-11-20 20:14:50 +00002689 def tearDown(self):
2690 support.unlink(support.TESTFN)
2691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692 def test___all__(self):
2693 for name in self.io.__all__:
2694 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002695 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002696 if name == "open":
2697 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002698 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002699 self.assertTrue(issubclass(obj, Exception), name)
2700 elif not name.startswith("SEEK_"):
2701 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002702
Barry Warsaw40e82462008-11-20 20:14:50 +00002703 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002705 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002706 f.close()
2707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002709 self.assertEqual(f.name, support.TESTFN)
2710 self.assertEqual(f.buffer.name, support.TESTFN)
2711 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2712 self.assertEqual(f.mode, "U")
2713 self.assertEqual(f.buffer.mode, "rb")
2714 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002715 f.close()
2716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002717 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002718 self.assertEqual(f.mode, "w+")
2719 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2720 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002723 self.assertEqual(g.mode, "wb")
2724 self.assertEqual(g.raw.mode, "wb")
2725 self.assertEqual(g.name, f.fileno())
2726 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002727 f.close()
2728 g.close()
2729
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002730 def test_io_after_close(self):
2731 for kwargs in [
2732 {"mode": "w"},
2733 {"mode": "wb"},
2734 {"mode": "w", "buffering": 1},
2735 {"mode": "w", "buffering": 2},
2736 {"mode": "wb", "buffering": 0},
2737 {"mode": "r"},
2738 {"mode": "rb"},
2739 {"mode": "r", "buffering": 1},
2740 {"mode": "r", "buffering": 2},
2741 {"mode": "rb", "buffering": 0},
2742 {"mode": "w+"},
2743 {"mode": "w+b"},
2744 {"mode": "w+", "buffering": 1},
2745 {"mode": "w+", "buffering": 2},
2746 {"mode": "w+b", "buffering": 0},
2747 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002748 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002749 f.close()
2750 self.assertRaises(ValueError, f.flush)
2751 self.assertRaises(ValueError, f.fileno)
2752 self.assertRaises(ValueError, f.isatty)
2753 self.assertRaises(ValueError, f.__iter__)
2754 if hasattr(f, "peek"):
2755 self.assertRaises(ValueError, f.peek, 1)
2756 self.assertRaises(ValueError, f.read)
2757 if hasattr(f, "read1"):
2758 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002759 if hasattr(f, "readall"):
2760 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002761 if hasattr(f, "readinto"):
2762 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2763 self.assertRaises(ValueError, f.readline)
2764 self.assertRaises(ValueError, f.readlines)
2765 self.assertRaises(ValueError, f.seek, 0)
2766 self.assertRaises(ValueError, f.tell)
2767 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 self.assertRaises(ValueError, f.write,
2769 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002770 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002771 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 def test_blockingioerror(self):
2774 # Various BlockingIOError issues
2775 self.assertRaises(TypeError, self.BlockingIOError)
2776 self.assertRaises(TypeError, self.BlockingIOError, 1)
2777 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2778 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2779 b = self.BlockingIOError(1, "")
2780 self.assertEqual(b.characters_written, 0)
2781 class C(str):
2782 pass
2783 c = C("")
2784 b = self.BlockingIOError(1, c)
2785 c.b = b
2786 b.c = c
2787 wr = weakref.ref(c)
2788 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002789 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002790 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791
2792 def test_abcs(self):
2793 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002794 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2795 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2796 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2797 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002798
2799 def _check_abc_inheritance(self, abcmodule):
2800 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002801 self.assertIsInstance(f, abcmodule.IOBase)
2802 self.assertIsInstance(f, abcmodule.RawIOBase)
2803 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2804 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002805 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002806 self.assertIsInstance(f, abcmodule.IOBase)
2807 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2808 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2809 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002811 self.assertIsInstance(f, abcmodule.IOBase)
2812 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2813 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2814 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002815
2816 def test_abc_inheritance(self):
2817 # Test implementations inherit from their respective ABCs
2818 self._check_abc_inheritance(self)
2819
2820 def test_abc_inheritance_official(self):
2821 # Test implementations inherit from the official ABCs of the
2822 # baseline "io" module.
2823 self._check_abc_inheritance(io)
2824
Antoine Pitroue033e062010-10-29 10:38:18 +00002825 def _check_warn_on_dealloc(self, *args, **kwargs):
2826 f = open(*args, **kwargs)
2827 r = repr(f)
2828 with self.assertWarns(ResourceWarning) as cm:
2829 f = None
2830 support.gc_collect()
2831 self.assertIn(r, str(cm.warning.args[0]))
2832
2833 def test_warn_on_dealloc(self):
2834 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2835 self._check_warn_on_dealloc(support.TESTFN, "wb")
2836 self._check_warn_on_dealloc(support.TESTFN, "w")
2837
2838 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2839 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002840 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002841 for fd in fds:
2842 try:
2843 os.close(fd)
2844 except EnvironmentError as e:
2845 if e.errno != errno.EBADF:
2846 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002847 self.addCleanup(cleanup_fds)
2848 r, w = os.pipe()
2849 fds += r, w
2850 self._check_warn_on_dealloc(r, *args, **kwargs)
2851 # When using closefd=False, there's no warning
2852 r, w = os.pipe()
2853 fds += r, w
2854 with warnings.catch_warnings(record=True) as recorded:
2855 open(r, *args, closefd=False, **kwargs)
2856 support.gc_collect()
2857 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002858
2859 def test_warn_on_dealloc_fd(self):
2860 self._check_warn_on_dealloc_fd("rb", buffering=0)
2861 self._check_warn_on_dealloc_fd("rb")
2862 self._check_warn_on_dealloc_fd("r")
2863
2864
Antoine Pitrou243757e2010-11-05 21:15:39 +00002865 def test_pickling(self):
2866 # Pickling file objects is forbidden
2867 for kwargs in [
2868 {"mode": "w"},
2869 {"mode": "wb"},
2870 {"mode": "wb", "buffering": 0},
2871 {"mode": "r"},
2872 {"mode": "rb"},
2873 {"mode": "rb", "buffering": 0},
2874 {"mode": "w+"},
2875 {"mode": "w+b"},
2876 {"mode": "w+b", "buffering": 0},
2877 ]:
2878 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2879 with self.open(support.TESTFN, **kwargs) as f:
2880 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2881
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002882 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2883 def test_nonblock_pipe_write_bigbuf(self):
2884 self._test_nonblock_pipe_write(16*1024)
2885
2886 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2887 def test_nonblock_pipe_write_smallbuf(self):
2888 self._test_nonblock_pipe_write(1024)
2889
2890 def _set_non_blocking(self, fd):
2891 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2892 self.assertNotEqual(flags, -1)
2893 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2894 self.assertEqual(res, 0)
2895
2896 def _test_nonblock_pipe_write(self, bufsize):
2897 sent = []
2898 received = []
2899 r, w = os.pipe()
2900 self._set_non_blocking(r)
2901 self._set_non_blocking(w)
2902
2903 # To exercise all code paths in the C implementation we need
2904 # to play with buffer sizes. For instance, if we choose a
2905 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2906 # then we will never get a partial write of the buffer.
2907 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2908 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2909
2910 with rf, wf:
2911 for N in 9999, 73, 7574:
2912 try:
2913 i = 0
2914 while True:
2915 msg = bytes([i % 26 + 97]) * N
2916 sent.append(msg)
2917 wf.write(msg)
2918 i += 1
2919
2920 except self.BlockingIOError as e:
2921 self.assertEqual(e.args[0], errno.EAGAIN)
2922 sent[-1] = sent[-1][:e.characters_written]
2923 received.append(rf.read())
2924 msg = b'BLOCKED'
2925 wf.write(msg)
2926 sent.append(msg)
2927
2928 while True:
2929 try:
2930 wf.flush()
2931 break
2932 except self.BlockingIOError as e:
2933 self.assertEqual(e.args[0], errno.EAGAIN)
2934 self.assertEqual(e.characters_written, 0)
2935 received.append(rf.read())
2936
2937 received += iter(rf.read, None)
2938
2939 sent, received = b''.join(sent), b''.join(received)
2940 self.assertTrue(sent == received)
2941 self.assertTrue(wf.closed)
2942 self.assertTrue(rf.closed)
2943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002944class CMiscIOTest(MiscIOTest):
2945 io = io
2946
2947class PyMiscIOTest(MiscIOTest):
2948 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002949
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002950
2951@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2952class SignalsTest(unittest.TestCase):
2953
2954 def setUp(self):
2955 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2956
2957 def tearDown(self):
2958 signal.signal(signal.SIGALRM, self.oldalrm)
2959
2960 def alarm_interrupt(self, sig, frame):
2961 1/0
2962
2963 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002964 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2965 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002966 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2967 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002968 invokes the signal handler, and bubbles up the exception raised
2969 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002970 read_results = []
2971 def _read():
2972 s = os.read(r, 1)
2973 read_results.append(s)
2974 t = threading.Thread(target=_read)
2975 t.daemon = True
2976 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002977 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002978 try:
2979 wio = self.io.open(w, **fdopen_kwargs)
2980 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002981 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002982 # Fill the pipe enough that the write will be blocking.
2983 # It will be interrupted by the timer armed above. Since the
2984 # other thread has read one byte, the low-level write will
2985 # return with a successful (partial) result rather than an EINTR.
2986 # The buffered IO layer must check for pending signal
2987 # handlers, which in this case will invoke alarm_interrupt().
2988 self.assertRaises(ZeroDivisionError,
2989 wio.write, item * (1024 * 1024))
2990 t.join()
2991 # We got one byte, get another one and check that it isn't a
2992 # repeat of the first one.
2993 read_results.append(os.read(r, 1))
2994 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2995 finally:
2996 os.close(w)
2997 os.close(r)
2998 # This is deliberate. If we didn't close the file descriptor
2999 # before closing wio, wio would try to flush its internal
3000 # buffer, and block again.
3001 try:
3002 wio.close()
3003 except IOError as e:
3004 if e.errno != errno.EBADF:
3005 raise
3006
3007 def test_interrupted_write_unbuffered(self):
3008 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3009
3010 def test_interrupted_write_buffered(self):
3011 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3012
3013 def test_interrupted_write_text(self):
3014 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3015
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003016 def check_reentrant_write(self, data, **fdopen_kwargs):
3017 def on_alarm(*args):
3018 # Will be called reentrantly from the same thread
3019 wio.write(data)
3020 1/0
3021 signal.signal(signal.SIGALRM, on_alarm)
3022 r, w = os.pipe()
3023 wio = self.io.open(w, **fdopen_kwargs)
3024 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003025 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003026 # Either the reentrant call to wio.write() fails with RuntimeError,
3027 # or the signal handler raises ZeroDivisionError.
3028 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3029 while 1:
3030 for i in range(100):
3031 wio.write(data)
3032 wio.flush()
3033 # Make sure the buffer doesn't fill up and block further writes
3034 os.read(r, len(data) * 100)
3035 exc = cm.exception
3036 if isinstance(exc, RuntimeError):
3037 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3038 finally:
3039 wio.close()
3040 os.close(r)
3041
3042 def test_reentrant_write_buffered(self):
3043 self.check_reentrant_write(b"xy", mode="wb")
3044
3045 def test_reentrant_write_text(self):
3046 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3047
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003048 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3049 """Check that a buffered read, when it gets interrupted (either
3050 returning a partial result or EINTR), properly invokes the signal
3051 handler and retries if the latter returned successfully."""
3052 r, w = os.pipe()
3053 fdopen_kwargs["closefd"] = False
3054 def alarm_handler(sig, frame):
3055 os.write(w, b"bar")
3056 signal.signal(signal.SIGALRM, alarm_handler)
3057 try:
3058 rio = self.io.open(r, **fdopen_kwargs)
3059 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003060 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003061 # Expected behaviour:
3062 # - first raw read() returns partial b"foo"
3063 # - second raw read() returns EINTR
3064 # - third raw read() returns b"bar"
3065 self.assertEqual(decode(rio.read(6)), "foobar")
3066 finally:
3067 rio.close()
3068 os.close(w)
3069 os.close(r)
3070
Antoine Pitrou20db5112011-08-19 20:32:34 +02003071 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003072 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3073 mode="rb")
3074
Antoine Pitrou20db5112011-08-19 20:32:34 +02003075 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003076 self.check_interrupted_read_retry(lambda x: x,
3077 mode="r")
3078
3079 @unittest.skipUnless(threading, 'Threading required for this test.')
3080 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3081 """Check that a buffered write, when it gets interrupted (either
3082 returning a partial result or EINTR), properly invokes the signal
3083 handler and retries if the latter returned successfully."""
3084 select = support.import_module("select")
3085 # A quantity that exceeds the buffer size of an anonymous pipe's
3086 # write end.
3087 N = 1024 * 1024
3088 r, w = os.pipe()
3089 fdopen_kwargs["closefd"] = False
3090 # We need a separate thread to read from the pipe and allow the
3091 # write() to finish. This thread is started after the SIGALRM is
3092 # received (forcing a first EINTR in write()).
3093 read_results = []
3094 write_finished = False
3095 def _read():
3096 while not write_finished:
3097 while r in select.select([r], [], [], 1.0)[0]:
3098 s = os.read(r, 1024)
3099 read_results.append(s)
3100 t = threading.Thread(target=_read)
3101 t.daemon = True
3102 def alarm1(sig, frame):
3103 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003104 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003105 def alarm2(sig, frame):
3106 t.start()
3107 signal.signal(signal.SIGALRM, alarm1)
3108 try:
3109 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003110 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003111 # Expected behaviour:
3112 # - first raw write() is partial (because of the limited pipe buffer
3113 # and the first alarm)
3114 # - second raw write() returns EINTR (because of the second alarm)
3115 # - subsequent write()s are successful (either partial or complete)
3116 self.assertEqual(N, wio.write(item * N))
3117 wio.flush()
3118 write_finished = True
3119 t.join()
3120 self.assertEqual(N, sum(len(x) for x in read_results))
3121 finally:
3122 write_finished = True
3123 os.close(w)
3124 os.close(r)
3125 # This is deliberate. If we didn't close the file descriptor
3126 # before closing wio, wio would try to flush its internal
3127 # buffer, and could block (in case of failure).
3128 try:
3129 wio.close()
3130 except IOError as e:
3131 if e.errno != errno.EBADF:
3132 raise
3133
Antoine Pitrou20db5112011-08-19 20:32:34 +02003134 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003135 self.check_interrupted_write_retry(b"x", mode="wb")
3136
Antoine Pitrou20db5112011-08-19 20:32:34 +02003137 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003138 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3139
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003140
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003141class CSignalsTest(SignalsTest):
3142 io = io
3143
3144class PySignalsTest(SignalsTest):
3145 io = pyio
3146
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003147 # Handling reentrancy issues would slow down _pyio even more, so the
3148 # tests are disabled.
3149 test_reentrant_write_buffered = None
3150 test_reentrant_write_text = None
3151
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003152
Guido van Rossum28524c72007-02-27 05:47:44 +00003153def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003154 tests = (CIOTest, PyIOTest,
3155 CBufferedReaderTest, PyBufferedReaderTest,
3156 CBufferedWriterTest, PyBufferedWriterTest,
3157 CBufferedRWPairTest, PyBufferedRWPairTest,
3158 CBufferedRandomTest, PyBufferedRandomTest,
3159 StatefulIncrementalDecoderTest,
3160 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3161 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003162 CMiscIOTest, PyMiscIOTest,
3163 CSignalsTest, PySignalsTest,
3164 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003165
3166 # Put the namespaces of the IO module we are testing and some useful mock
3167 # classes in the __dict__ of each test.
3168 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003169 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003170 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3171 c_io_ns = {name : getattr(io, name) for name in all_members}
3172 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3173 globs = globals()
3174 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3175 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3176 # Avoid turning open into a bound method.
3177 py_io_ns["open"] = pyio.OpenWrapper
3178 for test in tests:
3179 if test.__name__.startswith("C"):
3180 for name, obj in c_io_ns.items():
3181 setattr(test, name, obj)
3182 elif test.__name__.startswith("Py"):
3183 for name, obj in py_io_ns.items():
3184 setattr(test, name, obj)
3185
3186 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003187
3188if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003189 test_main()