blob: 249dc304b46d0a8340eb976506b3dffb647b218d [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020034import _testcapi
Georg Brandl1b37e872010-03-14 10:45:50 +000035from itertools import cycle, count
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
53 with open(__file__, "r", encoding="latin1") as f:
54 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
606
607 def test_multi_close(self):
608 f = self.open(support.TESTFN, "wb", buffering=0)
609 f.close()
610 f.close()
611 f.close()
612 self.assertRaises(ValueError, f.flush)
613
Antoine Pitrou328ec742010-09-14 18:37:24 +0000614 def test_RawIOBase_read(self):
615 # Exercise the default RawIOBase.read() implementation (which calls
616 # readinto() internally).
617 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
618 self.assertEqual(rawio.read(2), b"ab")
619 self.assertEqual(rawio.read(2), b"c")
620 self.assertEqual(rawio.read(2), b"d")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"ef")
623 self.assertEqual(rawio.read(2), b"g")
624 self.assertEqual(rawio.read(2), None)
625 self.assertEqual(rawio.read(2), b"")
626
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400627 def test_types_have_dict(self):
628 test = (
629 self.IOBase(),
630 self.RawIOBase(),
631 self.TextIOBase(),
632 self.StringIO(),
633 self.BytesIO()
634 )
635 for obj in test:
636 self.assertTrue(hasattr(obj, "__dict__"))
637
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200638 def test_fileio_closefd(self):
639 # Issue #4841
640 with self.open(__file__, 'rb') as f1, \
641 self.open(__file__, 'rb') as f2:
642 fileio = self.FileIO(f1.fileno(), closefd=False)
643 # .__init__() must not close f1
644 fileio.__init__(f2.fileno(), closefd=False)
645 f1.readline()
646 # .close() must not close f2
647 fileio.close()
648 f2.readline()
649
650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200652
653 def test_IOBase_finalize(self):
654 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
655 # class which inherits IOBase and an object of this class are caught
656 # in a reference cycle and close() is already in the method cache.
657 class MyIO(self.IOBase):
658 def close(self):
659 pass
660
661 # create an instance to populate the method cache
662 MyIO()
663 obj = MyIO()
664 obj.obj = obj
665 wr = weakref.ref(obj)
666 del MyIO
667 del obj
668 support.gc_collect()
669 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class PyIOTest(IOTest):
672 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000673
Guido van Rossuma9e20242007-03-08 00:43:48 +0000674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675class CommonBufferedTests:
676 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
677
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000678 def test_detach(self):
679 raw = self.MockRawIO()
680 buf = self.tp(raw)
681 self.assertIs(buf.detach(), raw)
682 self.assertRaises(ValueError, buf.detach)
683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_fileno(self):
685 rawio = self.MockRawIO()
686 bufio = self.tp(rawio)
687
Ezio Melottib3aedd42010-11-20 19:04:17 +0000688 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689
690 def test_no_fileno(self):
691 # XXX will we always have fileno() function? If so, kill
692 # this test. Else, write it.
693 pass
694
695 def test_invalid_args(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698 # Invalid whence
699 self.assertRaises(ValueError, bufio.seek, 0, -1)
700 self.assertRaises(ValueError, bufio.seek, 0, 3)
701
702 def test_override_destructor(self):
703 tp = self.tp
704 record = []
705 class MyBufferedIO(tp):
706 def __del__(self):
707 record.append(1)
708 try:
709 f = super().__del__
710 except AttributeError:
711 pass
712 else:
713 f()
714 def close(self):
715 record.append(2)
716 super().close()
717 def flush(self):
718 record.append(3)
719 super().flush()
720 rawio = self.MockRawIO()
721 bufio = MyBufferedIO(rawio)
722 writable = bufio.writable()
723 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000724 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 if writable:
726 self.assertEqual(record, [1, 2, 3])
727 else:
728 self.assertEqual(record, [1, 2])
729
730 def test_context_manager(self):
731 # Test usability as a context manager
732 rawio = self.MockRawIO()
733 bufio = self.tp(rawio)
734 def _with():
735 with bufio:
736 pass
737 _with()
738 # bufio should now be closed, and using it a second time should raise
739 # a ValueError.
740 self.assertRaises(ValueError, _with)
741
742 def test_error_through_destructor(self):
743 # Test that the exception state is not modified by a destructor,
744 # even if close() fails.
745 rawio = self.CloseFailureIO()
746 def f():
747 self.tp(rawio).xyzzy
748 with support.captured_output("stderr") as s:
749 self.assertRaises(AttributeError, f)
750 s = s.getvalue().strip()
751 if s:
752 # The destructor *may* have printed an unraisable error, check it
753 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000754 self.assertTrue(s.startswith("Exception IOError: "), s)
755 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000756
Antoine Pitrou716c4442009-05-23 19:04:03 +0000757 def test_repr(self):
758 raw = self.MockRawIO()
759 b = self.tp(raw)
760 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
761 self.assertEqual(repr(b), "<%s>" % clsname)
762 raw.name = "dummy"
763 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
764 raw.name = b"dummy"
765 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
766
Antoine Pitrou6be88762010-05-03 16:48:20 +0000767 def test_flush_error_on_close(self):
768 raw = self.MockRawIO()
769 def bad_flush():
770 raise IOError()
771 raw.flush = bad_flush
772 b = self.tp(raw)
773 self.assertRaises(IOError, b.close) # exception not swallowed
774
775 def test_multi_close(self):
776 raw = self.MockRawIO()
777 b = self.tp(raw)
778 b.close()
779 b.close()
780 b.close()
781 self.assertRaises(ValueError, b.flush)
782
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000783 def test_unseekable(self):
784 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
785 self.assertRaises(self.UnsupportedOperation, bufio.tell)
786 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
787
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000788 def test_readonly_attributes(self):
789 raw = self.MockRawIO()
790 buf = self.tp(raw)
791 x = self.MockRawIO()
792 with self.assertRaises(AttributeError):
793 buf.raw = x
794
Guido van Rossum78892e42007-04-06 17:31:18 +0000795
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200796class SizeofTest:
797
798 @support.cpython_only
799 def test_sizeof(self):
800 bufsize1 = 4096
801 bufsize2 = 8192
802 rawio = self.MockRawIO()
803 bufio = self.tp(rawio, buffer_size=bufsize1)
804 size = sys.getsizeof(bufio) - bufsize1
805 rawio = self.MockRawIO()
806 bufio = self.tp(rawio, buffer_size=bufsize2)
807 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
808
809
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000810class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
811 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000813 def test_constructor(self):
814 rawio = self.MockRawIO([b"abc"])
815 bufio = self.tp(rawio)
816 bufio.__init__(rawio)
817 bufio.__init__(rawio, buffer_size=1024)
818 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000819 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
822 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
823 rawio = self.MockRawIO([b"abc"])
824 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000825 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000828 for arg in (None, 7):
829 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
830 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000831 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832 # Invalid args
833 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835 def test_read1(self):
836 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
837 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000838 self.assertEqual(b"a", bufio.read(1))
839 self.assertEqual(b"b", bufio.read1(1))
840 self.assertEqual(rawio._reads, 1)
841 self.assertEqual(b"c", bufio.read1(100))
842 self.assertEqual(rawio._reads, 1)
843 self.assertEqual(b"d", bufio.read1(100))
844 self.assertEqual(rawio._reads, 2)
845 self.assertEqual(b"efg", bufio.read1(100))
846 self.assertEqual(rawio._reads, 3)
847 self.assertEqual(b"", bufio.read1(100))
848 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 # Invalid args
850 self.assertRaises(ValueError, bufio.read1, -1)
851
852 def test_readinto(self):
853 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
854 bufio = self.tp(rawio)
855 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(bufio.readinto(b), 2)
857 self.assertEqual(b, b"ab")
858 self.assertEqual(bufio.readinto(b), 2)
859 self.assertEqual(b, b"cd")
860 self.assertEqual(bufio.readinto(b), 2)
861 self.assertEqual(b, b"ef")
862 self.assertEqual(bufio.readinto(b), 1)
863 self.assertEqual(b, b"gf")
864 self.assertEqual(bufio.readinto(b), 0)
865 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000867 def test_readlines(self):
868 def bufio():
869 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
870 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000871 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
872 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
873 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000876 data = b"abcdefghi"
877 dlen = len(data)
878
879 tests = [
880 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
881 [ 100, [ 3, 3, 3], [ dlen ] ],
882 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
883 ]
884
885 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 rawio = self.MockFileIO(data)
887 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000888 pos = 0
889 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000891 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000893 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000896 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
898 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000899 self.assertEqual(b"abcd", bufio.read(6))
900 self.assertEqual(b"e", bufio.read(1))
901 self.assertEqual(b"fg", bufio.read())
902 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200903 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Victor Stinnera80987f2011-05-25 22:47:16 +0200906 rawio = self.MockRawIO((b"a", None, None))
907 self.assertEqual(b"a", rawio.readall())
908 self.assertIsNone(rawio.readall())
909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_read_past_eof(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000913
Ezio Melottib3aedd42010-11-20 19:04:17 +0000914 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_read_all(self):
917 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
918 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000919
Ezio Melottib3aedd42010-11-20 19:04:17 +0000920 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000921
Victor Stinner45df8202010-04-28 22:31:17 +0000922 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000923 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000925 try:
926 # Write out many bytes with exactly the same number of 0's,
927 # 1's... 255's. This will help us check that concurrent reading
928 # doesn't duplicate or forget contents.
929 N = 1000
930 l = list(range(256)) * N
931 random.shuffle(l)
932 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000933 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000934 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000935 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000937 errors = []
938 results = []
939 def f():
940 try:
941 # Intra-buffer read then buffer-flushing read
942 for n in cycle([1, 19]):
943 s = bufio.read(n)
944 if not s:
945 break
946 # list.append() is atomic
947 results.append(s)
948 except Exception as e:
949 errors.append(e)
950 raise
951 threads = [threading.Thread(target=f) for x in range(20)]
952 for t in threads:
953 t.start()
954 time.sleep(0.02) # yield
955 for t in threads:
956 t.join()
957 self.assertFalse(errors,
958 "the following exceptions were caught: %r" % errors)
959 s = b''.join(results)
960 for i in range(256):
961 c = bytes(bytearray([i]))
962 self.assertEqual(s.count(c), N)
963 finally:
964 support.unlink(support.TESTFN)
965
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200966 def test_unseekable(self):
967 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
968 self.assertRaises(self.UnsupportedOperation, bufio.tell)
969 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
970 bufio.read(1)
971 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
972 self.assertRaises(self.UnsupportedOperation, bufio.tell)
973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_misbehaved_io(self):
975 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
976 bufio = self.tp(rawio)
977 self.assertRaises(IOError, bufio.seek, 0)
978 self.assertRaises(IOError, bufio.tell)
979
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000980 def test_no_extraneous_read(self):
981 # Issue #9550; when the raw IO object has satisfied the read request,
982 # we should not issue any additional reads, otherwise it may block
983 # (e.g. socket).
984 bufsize = 16
985 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
986 rawio = self.MockRawIO([b"x" * n])
987 bufio = self.tp(rawio, bufsize)
988 self.assertEqual(bufio.read(n), b"x" * n)
989 # Simple case: one raw read is enough to satisfy the request.
990 self.assertEqual(rawio._extraneous_reads, 0,
991 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
992 # A more complex case where two raw reads are needed to satisfy
993 # the request.
994 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
995 bufio = self.tp(rawio, bufsize)
996 self.assertEqual(bufio.read(n), b"x" * n)
997 self.assertEqual(rawio._extraneous_reads, 0,
998 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
999
1000
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001001class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 tp = io.BufferedReader
1003
1004 def test_constructor(self):
1005 BufferedReaderTest.test_constructor(self)
1006 # The allocation can succeed on 32-bit builds, e.g. with more
1007 # than 2GB RAM and a 64-bit kernel.
1008 if sys.maxsize > 0x7FFFFFFF:
1009 rawio = self.MockRawIO()
1010 bufio = self.tp(rawio)
1011 self.assertRaises((OverflowError, MemoryError, ValueError),
1012 bufio.__init__, rawio, sys.maxsize)
1013
1014 def test_initialization(self):
1015 rawio = self.MockRawIO([b"abc"])
1016 bufio = self.tp(rawio)
1017 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1018 self.assertRaises(ValueError, bufio.read)
1019 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1020 self.assertRaises(ValueError, bufio.read)
1021 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1022 self.assertRaises(ValueError, bufio.read)
1023
1024 def test_misbehaved_io_read(self):
1025 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1026 bufio = self.tp(rawio)
1027 # _pyio.BufferedReader seems to implement reading different, so that
1028 # checking this is not so easy.
1029 self.assertRaises(IOError, bufio.read, 10)
1030
1031 def test_garbage_collection(self):
1032 # C BufferedReader objects are collected.
1033 # The Python version has __del__, so it ends into gc.garbage instead
1034 rawio = self.FileIO(support.TESTFN, "w+b")
1035 f = self.tp(rawio)
1036 f.f = f
1037 wr = weakref.ref(f)
1038 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001039 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001040 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041
1042class PyBufferedReaderTest(BufferedReaderTest):
1043 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001044
Guido van Rossuma9e20242007-03-08 00:43:48 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1047 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 def test_constructor(self):
1050 rawio = self.MockRawIO()
1051 bufio = self.tp(rawio)
1052 bufio.__init__(rawio)
1053 bufio.__init__(rawio, buffer_size=1024)
1054 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001055 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 bufio.flush()
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1060 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001061 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001063 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001065 def test_detach_flush(self):
1066 raw = self.MockRawIO()
1067 buf = self.tp(raw)
1068 buf.write(b"howdy!")
1069 self.assertFalse(raw._write_stack)
1070 buf.detach()
1071 self.assertEqual(raw._write_stack, [b"howdy!"])
1072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001074 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075 writer = self.MockRawIO()
1076 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001077 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001078 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def test_write_overflow(self):
1081 writer = self.MockRawIO()
1082 bufio = self.tp(writer, 8)
1083 contents = b"abcdefghijklmnop"
1084 for n in range(0, len(contents), 3):
1085 bufio.write(contents[n:n+3])
1086 flushed = b"".join(writer._write_stack)
1087 # At least (total - 8) bytes were implicitly flushed, perhaps more
1088 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001089 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 def check_writes(self, intermediate_func):
1092 # Lots of writes, test the flushed output is as expected.
1093 contents = bytes(range(256)) * 1000
1094 n = 0
1095 writer = self.MockRawIO()
1096 bufio = self.tp(writer, 13)
1097 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1098 def gen_sizes():
1099 for size in count(1):
1100 for i in range(15):
1101 yield size
1102 sizes = gen_sizes()
1103 while n < len(contents):
1104 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 intermediate_func(bufio)
1107 n += size
1108 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 def test_writes(self):
1112 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_writes_and_flushes(self):
1115 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 def test_writes_and_seeks(self):
1118 def _seekabs(bufio):
1119 pos = bufio.tell()
1120 bufio.seek(pos + 1, 0)
1121 bufio.seek(pos - 1, 0)
1122 bufio.seek(pos, 0)
1123 self.check_writes(_seekabs)
1124 def _seekrel(bufio):
1125 pos = bufio.seek(0, 1)
1126 bufio.seek(+1, 1)
1127 bufio.seek(-1, 1)
1128 bufio.seek(pos, 0)
1129 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def test_writes_and_truncates(self):
1132 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 def test_write_non_blocking(self):
1135 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001136 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001137
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(bufio.write(b"abcd"), 4)
1139 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 # 1 byte will be written, the rest will be buffered
1141 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1145 raw.block_on(b"0")
1146 try:
1147 bufio.write(b"opqrwxyz0123456789")
1148 except self.BlockingIOError as e:
1149 written = e.characters_written
1150 else:
1151 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(written, 16)
1153 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001155
Ezio Melottib3aedd42010-11-20 19:04:17 +00001156 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 s = raw.pop_written()
1158 # Previously buffered bytes were flushed
1159 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_write_and_rewind(self):
1162 raw = io.BytesIO()
1163 bufio = self.tp(raw, 4)
1164 self.assertEqual(bufio.write(b"abcdef"), 6)
1165 self.assertEqual(bufio.tell(), 6)
1166 bufio.seek(0, 0)
1167 self.assertEqual(bufio.write(b"XY"), 2)
1168 bufio.seek(6, 0)
1169 self.assertEqual(raw.getvalue(), b"XYcdef")
1170 self.assertEqual(bufio.write(b"123456"), 6)
1171 bufio.flush()
1172 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 def test_flush(self):
1175 writer = self.MockRawIO()
1176 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001177 bufio.write(b"abc")
1178 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001180
Antoine Pitrou131a4892012-10-16 22:57:11 +02001181 def test_writelines(self):
1182 l = [b'ab', b'cd', b'ef']
1183 writer = self.MockRawIO()
1184 bufio = self.tp(writer, 8)
1185 bufio.writelines(l)
1186 bufio.flush()
1187 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1188
1189 def test_writelines_userlist(self):
1190 l = UserList([b'ab', b'cd', b'ef'])
1191 writer = self.MockRawIO()
1192 bufio = self.tp(writer, 8)
1193 bufio.writelines(l)
1194 bufio.flush()
1195 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1196
1197 def test_writelines_error(self):
1198 writer = self.MockRawIO()
1199 bufio = self.tp(writer, 8)
1200 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1201 self.assertRaises(TypeError, bufio.writelines, None)
1202 self.assertRaises(TypeError, bufio.writelines, 'abc')
1203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 def test_destructor(self):
1205 writer = self.MockRawIO()
1206 bufio = self.tp(writer, 8)
1207 bufio.write(b"abc")
1208 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001209 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211
1212 def test_truncate(self):
1213 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001214 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 bufio = self.tp(raw, 8)
1216 bufio.write(b"abcdef")
1217 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001218 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001219 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220 self.assertEqual(f.read(), b"abc")
1221
Victor Stinner45df8202010-04-28 22:31:17 +00001222 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001223 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001225 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 # Write out many bytes from many threads and test they were
1227 # all flushed.
1228 N = 1000
1229 contents = bytes(range(256)) * N
1230 sizes = cycle([1, 19])
1231 n = 0
1232 queue = deque()
1233 while n < len(contents):
1234 size = next(sizes)
1235 queue.append(contents[n:n+size])
1236 n += size
1237 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001238 # We use a real file object because it allows us to
1239 # exercise situations where the GIL is released before
1240 # writing the buffer to the raw streams. This is in addition
1241 # to concurrency issues due to switching threads in the middle
1242 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001243 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001245 errors = []
1246 def f():
1247 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 while True:
1249 try:
1250 s = queue.popleft()
1251 except IndexError:
1252 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001253 bufio.write(s)
1254 except Exception as e:
1255 errors.append(e)
1256 raise
1257 threads = [threading.Thread(target=f) for x in range(20)]
1258 for t in threads:
1259 t.start()
1260 time.sleep(0.02) # yield
1261 for t in threads:
1262 t.join()
1263 self.assertFalse(errors,
1264 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001266 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 s = f.read()
1268 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001270 finally:
1271 support.unlink(support.TESTFN)
1272
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 def test_misbehaved_io(self):
1274 rawio = self.MisbehavedRawIO()
1275 bufio = self.tp(rawio, 5)
1276 self.assertRaises(IOError, bufio.seek, 0)
1277 self.assertRaises(IOError, bufio.tell)
1278 self.assertRaises(IOError, bufio.write, b"abcdef")
1279
Benjamin Peterson59406a92009-03-26 17:10:29 +00001280 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001281 with support.check_warnings(("max_buffer_size is deprecated",
1282 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001283 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001284
1285
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001286class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 tp = io.BufferedWriter
1288
1289 def test_constructor(self):
1290 BufferedWriterTest.test_constructor(self)
1291 # The allocation can succeed on 32-bit builds, e.g. with more
1292 # than 2GB RAM and a 64-bit kernel.
1293 if sys.maxsize > 0x7FFFFFFF:
1294 rawio = self.MockRawIO()
1295 bufio = self.tp(rawio)
1296 self.assertRaises((OverflowError, MemoryError, ValueError),
1297 bufio.__init__, rawio, sys.maxsize)
1298
1299 def test_initialization(self):
1300 rawio = self.MockRawIO()
1301 bufio = self.tp(rawio)
1302 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1303 self.assertRaises(ValueError, bufio.write, b"def")
1304 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1305 self.assertRaises(ValueError, bufio.write, b"def")
1306 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1307 self.assertRaises(ValueError, bufio.write, b"def")
1308
1309 def test_garbage_collection(self):
1310 # C BufferedWriter objects are collected, and collecting them flushes
1311 # all data to disk.
1312 # The Python version has __del__, so it ends into gc.garbage instead
1313 rawio = self.FileIO(support.TESTFN, "w+b")
1314 f = self.tp(rawio)
1315 f.write(b"123xxx")
1316 f.x = f
1317 wr = weakref.ref(f)
1318 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001319 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001320 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001321 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 self.assertEqual(f.read(), b"123xxx")
1323
1324
1325class PyBufferedWriterTest(BufferedWriterTest):
1326 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001327
Guido van Rossum01a27522007-03-07 01:00:12 +00001328class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001329
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001330 def test_constructor(self):
1331 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001332 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001333
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001334 def test_detach(self):
1335 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1336 self.assertRaises(self.UnsupportedOperation, pair.detach)
1337
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001338 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001339 with support.check_warnings(("max_buffer_size is deprecated",
1340 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001341 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001342
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001343 def test_constructor_with_not_readable(self):
1344 class NotReadable(MockRawIO):
1345 def readable(self):
1346 return False
1347
1348 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1349
1350 def test_constructor_with_not_writeable(self):
1351 class NotWriteable(MockRawIO):
1352 def writable(self):
1353 return False
1354
1355 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1356
1357 def test_read(self):
1358 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1359
1360 self.assertEqual(pair.read(3), b"abc")
1361 self.assertEqual(pair.read(1), b"d")
1362 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001363 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1364 self.assertEqual(pair.read(None), b"abc")
1365
1366 def test_readlines(self):
1367 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1368 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1369 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1370 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001371
1372 def test_read1(self):
1373 # .read1() is delegated to the underlying reader object, so this test
1374 # can be shallow.
1375 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1376
1377 self.assertEqual(pair.read1(3), b"abc")
1378
1379 def test_readinto(self):
1380 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1381
1382 data = bytearray(5)
1383 self.assertEqual(pair.readinto(data), 5)
1384 self.assertEqual(data, b"abcde")
1385
1386 def test_write(self):
1387 w = self.MockRawIO()
1388 pair = self.tp(self.MockRawIO(), w)
1389
1390 pair.write(b"abc")
1391 pair.flush()
1392 pair.write(b"def")
1393 pair.flush()
1394 self.assertEqual(w._write_stack, [b"abc", b"def"])
1395
1396 def test_peek(self):
1397 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1398
1399 self.assertTrue(pair.peek(3).startswith(b"abc"))
1400 self.assertEqual(pair.read(3), b"abc")
1401
1402 def test_readable(self):
1403 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1404 self.assertTrue(pair.readable())
1405
1406 def test_writeable(self):
1407 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1408 self.assertTrue(pair.writable())
1409
1410 def test_seekable(self):
1411 # BufferedRWPairs are never seekable, even if their readers and writers
1412 # are.
1413 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1414 self.assertFalse(pair.seekable())
1415
1416 # .flush() is delegated to the underlying writer object and has been
1417 # tested in the test_write method.
1418
1419 def test_close_and_closed(self):
1420 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1421 self.assertFalse(pair.closed)
1422 pair.close()
1423 self.assertTrue(pair.closed)
1424
1425 def test_isatty(self):
1426 class SelectableIsAtty(MockRawIO):
1427 def __init__(self, isatty):
1428 MockRawIO.__init__(self)
1429 self._isatty = isatty
1430
1431 def isatty(self):
1432 return self._isatty
1433
1434 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1435 self.assertFalse(pair.isatty())
1436
1437 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1438 self.assertTrue(pair.isatty())
1439
1440 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1441 self.assertTrue(pair.isatty())
1442
1443 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1444 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446class CBufferedRWPairTest(BufferedRWPairTest):
1447 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449class PyBufferedRWPairTest(BufferedRWPairTest):
1450 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001451
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452
1453class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1454 read_mode = "rb+"
1455 write_mode = "wb+"
1456
1457 def test_constructor(self):
1458 BufferedReaderTest.test_constructor(self)
1459 BufferedWriterTest.test_constructor(self)
1460
1461 def test_read_and_write(self):
1462 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001463 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001464
1465 self.assertEqual(b"as", rw.read(2))
1466 rw.write(b"ddd")
1467 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001468 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001469 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001470 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 def test_seek_and_tell(self):
1473 raw = self.BytesIO(b"asdfghjkl")
1474 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001475
Ezio Melottib3aedd42010-11-20 19:04:17 +00001476 self.assertEqual(b"as", rw.read(2))
1477 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001478 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001479 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001480
Antoine Pitroue05565e2011-08-20 14:39:23 +02001481 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001482 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001483 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001484 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001485 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001486 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001487 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001488 self.assertEqual(7, rw.tell())
1489 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001490 rw.flush()
1491 self.assertEqual(b"asdf123fl", raw.getvalue())
1492
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001493 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 def check_flush_and_read(self, read_func):
1496 raw = self.BytesIO(b"abcdefghi")
1497 bufio = self.tp(raw)
1498
Ezio Melottib3aedd42010-11-20 19:04:17 +00001499 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001501 self.assertEqual(b"ef", read_func(bufio, 2))
1502 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001504 self.assertEqual(6, bufio.tell())
1505 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001506 raw.seek(0, 0)
1507 raw.write(b"XYZ")
1508 # flush() resets the read buffer
1509 bufio.flush()
1510 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001511 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512
1513 def test_flush_and_read(self):
1514 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1515
1516 def test_flush_and_readinto(self):
1517 def _readinto(bufio, n=-1):
1518 b = bytearray(n if n >= 0 else 9999)
1519 n = bufio.readinto(b)
1520 return bytes(b[:n])
1521 self.check_flush_and_read(_readinto)
1522
1523 def test_flush_and_peek(self):
1524 def _peek(bufio, n=-1):
1525 # This relies on the fact that the buffer can contain the whole
1526 # raw stream, otherwise peek() can return less.
1527 b = bufio.peek(n)
1528 if n != -1:
1529 b = b[:n]
1530 bufio.seek(len(b), 1)
1531 return b
1532 self.check_flush_and_read(_peek)
1533
1534 def test_flush_and_write(self):
1535 raw = self.BytesIO(b"abcdefghi")
1536 bufio = self.tp(raw)
1537
1538 bufio.write(b"123")
1539 bufio.flush()
1540 bufio.write(b"45")
1541 bufio.flush()
1542 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001543 self.assertEqual(b"12345fghi", raw.getvalue())
1544 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545
1546 def test_threads(self):
1547 BufferedReaderTest.test_threads(self)
1548 BufferedWriterTest.test_threads(self)
1549
1550 def test_writes_and_peek(self):
1551 def _peek(bufio):
1552 bufio.peek(1)
1553 self.check_writes(_peek)
1554 def _peek(bufio):
1555 pos = bufio.tell()
1556 bufio.seek(-1, 1)
1557 bufio.peek(1)
1558 bufio.seek(pos, 0)
1559 self.check_writes(_peek)
1560
1561 def test_writes_and_reads(self):
1562 def _read(bufio):
1563 bufio.seek(-1, 1)
1564 bufio.read(1)
1565 self.check_writes(_read)
1566
1567 def test_writes_and_read1s(self):
1568 def _read1(bufio):
1569 bufio.seek(-1, 1)
1570 bufio.read1(1)
1571 self.check_writes(_read1)
1572
1573 def test_writes_and_readintos(self):
1574 def _read(bufio):
1575 bufio.seek(-1, 1)
1576 bufio.readinto(bytearray(1))
1577 self.check_writes(_read)
1578
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001579 def test_write_after_readahead(self):
1580 # Issue #6629: writing after the buffer was filled by readahead should
1581 # first rewind the raw stream.
1582 for overwrite_size in [1, 5]:
1583 raw = self.BytesIO(b"A" * 10)
1584 bufio = self.tp(raw, 4)
1585 # Trigger readahead
1586 self.assertEqual(bufio.read(1), b"A")
1587 self.assertEqual(bufio.tell(), 1)
1588 # Overwriting should rewind the raw stream if it needs so
1589 bufio.write(b"B" * overwrite_size)
1590 self.assertEqual(bufio.tell(), overwrite_size + 1)
1591 # If the write size was smaller than the buffer size, flush() and
1592 # check that rewind happens.
1593 bufio.flush()
1594 self.assertEqual(bufio.tell(), overwrite_size + 1)
1595 s = raw.getvalue()
1596 self.assertEqual(s,
1597 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1598
Antoine Pitrou7c404892011-05-13 00:13:33 +02001599 def test_write_rewind_write(self):
1600 # Various combinations of reading / writing / seeking backwards / writing again
1601 def mutate(bufio, pos1, pos2):
1602 assert pos2 >= pos1
1603 # Fill the buffer
1604 bufio.seek(pos1)
1605 bufio.read(pos2 - pos1)
1606 bufio.write(b'\x02')
1607 # This writes earlier than the previous write, but still inside
1608 # the buffer.
1609 bufio.seek(pos1)
1610 bufio.write(b'\x01')
1611
1612 b = b"\x80\x81\x82\x83\x84"
1613 for i in range(0, len(b)):
1614 for j in range(i, len(b)):
1615 raw = self.BytesIO(b)
1616 bufio = self.tp(raw, 100)
1617 mutate(bufio, i, j)
1618 bufio.flush()
1619 expected = bytearray(b)
1620 expected[j] = 2
1621 expected[i] = 1
1622 self.assertEqual(raw.getvalue(), expected,
1623 "failed result for i=%d, j=%d" % (i, j))
1624
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001625 def test_truncate_after_read_or_write(self):
1626 raw = self.BytesIO(b"A" * 10)
1627 bufio = self.tp(raw, 100)
1628 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1629 self.assertEqual(bufio.truncate(), 2)
1630 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1631 self.assertEqual(bufio.truncate(), 4)
1632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def test_misbehaved_io(self):
1634 BufferedReaderTest.test_misbehaved_io(self)
1635 BufferedWriterTest.test_misbehaved_io(self)
1636
Antoine Pitroue05565e2011-08-20 14:39:23 +02001637 def test_interleaved_read_write(self):
1638 # Test for issue #12213
1639 with self.BytesIO(b'abcdefgh') as raw:
1640 with self.tp(raw, 100) as f:
1641 f.write(b"1")
1642 self.assertEqual(f.read(1), b'b')
1643 f.write(b'2')
1644 self.assertEqual(f.read1(1), b'd')
1645 f.write(b'3')
1646 buf = bytearray(1)
1647 f.readinto(buf)
1648 self.assertEqual(buf, b'f')
1649 f.write(b'4')
1650 self.assertEqual(f.peek(1), b'h')
1651 f.flush()
1652 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1653
1654 with self.BytesIO(b'abc') as raw:
1655 with self.tp(raw, 100) as f:
1656 self.assertEqual(f.read(1), b'a')
1657 f.write(b"2")
1658 self.assertEqual(f.read(1), b'c')
1659 f.flush()
1660 self.assertEqual(raw.getvalue(), b'a2c')
1661
1662 def test_interleaved_readline_write(self):
1663 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1664 with self.tp(raw) as f:
1665 f.write(b'1')
1666 self.assertEqual(f.readline(), b'b\n')
1667 f.write(b'2')
1668 self.assertEqual(f.readline(), b'def\n')
1669 f.write(b'3')
1670 self.assertEqual(f.readline(), b'\n')
1671 f.flush()
1672 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1673
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001674 # You can't construct a BufferedRandom over a non-seekable stream.
1675 test_unseekable = None
1676
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001677class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 tp = io.BufferedRandom
1679
1680 def test_constructor(self):
1681 BufferedRandomTest.test_constructor(self)
1682 # The allocation can succeed on 32-bit builds, e.g. with more
1683 # than 2GB RAM and a 64-bit kernel.
1684 if sys.maxsize > 0x7FFFFFFF:
1685 rawio = self.MockRawIO()
1686 bufio = self.tp(rawio)
1687 self.assertRaises((OverflowError, MemoryError, ValueError),
1688 bufio.__init__, rawio, sys.maxsize)
1689
1690 def test_garbage_collection(self):
1691 CBufferedReaderTest.test_garbage_collection(self)
1692 CBufferedWriterTest.test_garbage_collection(self)
1693
1694class PyBufferedRandomTest(BufferedRandomTest):
1695 tp = pyio.BufferedRandom
1696
1697
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001698# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1699# properties:
1700# - A single output character can correspond to many bytes of input.
1701# - The number of input bytes to complete the character can be
1702# undetermined until the last input byte is received.
1703# - The number of input bytes can vary depending on previous input.
1704# - A single input byte can correspond to many characters of output.
1705# - The number of output characters can be undetermined until the
1706# last input byte is received.
1707# - The number of output characters can vary depending on previous input.
1708
1709class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1710 """
1711 For testing seek/tell behavior with a stateful, buffering decoder.
1712
1713 Input is a sequence of words. Words may be fixed-length (length set
1714 by input) or variable-length (period-terminated). In variable-length
1715 mode, extra periods are ignored. Possible words are:
1716 - 'i' followed by a number sets the input length, I (maximum 99).
1717 When I is set to 0, words are space-terminated.
1718 - 'o' followed by a number sets the output length, O (maximum 99).
1719 - Any other word is converted into a word followed by a period on
1720 the output. The output word consists of the input word truncated
1721 or padded out with hyphens to make its length equal to O. If O
1722 is 0, the word is output verbatim without truncating or padding.
1723 I and O are initially set to 1. When I changes, any buffered input is
1724 re-scanned according to the new I. EOF also terminates the last word.
1725 """
1726
1727 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001728 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001729 self.reset()
1730
1731 def __repr__(self):
1732 return '<SID %x>' % id(self)
1733
1734 def reset(self):
1735 self.i = 1
1736 self.o = 1
1737 self.buffer = bytearray()
1738
1739 def getstate(self):
1740 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1741 return bytes(self.buffer), i*100 + o
1742
1743 def setstate(self, state):
1744 buffer, io = state
1745 self.buffer = bytearray(buffer)
1746 i, o = divmod(io, 100)
1747 self.i, self.o = i ^ 1, o ^ 1
1748
1749 def decode(self, input, final=False):
1750 output = ''
1751 for b in input:
1752 if self.i == 0: # variable-length, terminated with period
1753 if b == ord('.'):
1754 if self.buffer:
1755 output += self.process_word()
1756 else:
1757 self.buffer.append(b)
1758 else: # fixed-length, terminate after self.i bytes
1759 self.buffer.append(b)
1760 if len(self.buffer) == self.i:
1761 output += self.process_word()
1762 if final and self.buffer: # EOF terminates the last word
1763 output += self.process_word()
1764 return output
1765
1766 def process_word(self):
1767 output = ''
1768 if self.buffer[0] == ord('i'):
1769 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1770 elif self.buffer[0] == ord('o'):
1771 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1772 else:
1773 output = self.buffer.decode('ascii')
1774 if len(output) < self.o:
1775 output += '-'*self.o # pad out with hyphens
1776 if self.o:
1777 output = output[:self.o] # truncate to output length
1778 output += '.'
1779 self.buffer = bytearray()
1780 return output
1781
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001782 codecEnabled = False
1783
1784 @classmethod
1785 def lookupTestDecoder(cls, name):
1786 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001787 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001788 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001789 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001790 incrementalencoder=None,
1791 streamreader=None, streamwriter=None,
1792 incrementaldecoder=cls)
1793
1794# Register the previous decoder for testing.
1795# Disabled by default, tests will enable it.
1796codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1797
1798
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001799class StatefulIncrementalDecoderTest(unittest.TestCase):
1800 """
1801 Make sure the StatefulIncrementalDecoder actually works.
1802 """
1803
1804 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001805 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001806 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001807 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001808 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001809 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001810 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001811 # I=0, O=6 (variable-length input, fixed-length output)
1812 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1813 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001814 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001815 # I=6, O=3 (fixed-length input > fixed-length output)
1816 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1817 # I=0, then 3; O=29, then 15 (with longer output)
1818 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1819 'a----------------------------.' +
1820 'b----------------------------.' +
1821 'cde--------------------------.' +
1822 'abcdefghijabcde.' +
1823 'a.b------------.' +
1824 '.c.------------.' +
1825 'd.e------------.' +
1826 'k--------------.' +
1827 'l--------------.' +
1828 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001829 ]
1830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001832 # Try a few one-shot test cases.
1833 for input, eof, output in self.test_cases:
1834 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001836
1837 # Also test an unfinished decode, followed by forcing EOF.
1838 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001839 self.assertEqual(d.decode(b'oiabcd'), '')
1840 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001841
1842class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001843
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001844 def setUp(self):
1845 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1846 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001847 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001848
Guido van Rossumd0712812007-04-11 16:32:43 +00001849 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001850 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 def test_constructor(self):
1853 r = self.BytesIO(b"\xc3\xa9\n\n")
1854 b = self.BufferedReader(r, 1000)
1855 t = self.TextIOWrapper(b)
1856 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001857 self.assertEqual(t.encoding, "latin1")
1858 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001859 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001860 self.assertEqual(t.encoding, "utf8")
1861 self.assertEqual(t.line_buffering, True)
1862 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 self.assertRaises(TypeError, t.__init__, b, newline=42)
1864 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1865
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001866 def test_detach(self):
1867 r = self.BytesIO()
1868 b = self.BufferedWriter(r)
1869 t = self.TextIOWrapper(b)
1870 self.assertIs(t.detach(), b)
1871
1872 t = self.TextIOWrapper(b, encoding="ascii")
1873 t.write("howdy")
1874 self.assertFalse(r.getvalue())
1875 t.detach()
1876 self.assertEqual(r.getvalue(), b"howdy")
1877 self.assertRaises(ValueError, t.detach)
1878
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001879 def test_repr(self):
1880 raw = self.BytesIO("hello".encode("utf-8"))
1881 b = self.BufferedReader(raw)
1882 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001883 modname = self.TextIOWrapper.__module__
1884 self.assertEqual(repr(t),
1885 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1886 raw.name = "dummy"
1887 self.assertEqual(repr(t),
1888 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001889 t.mode = "r"
1890 self.assertEqual(repr(t),
1891 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001892 raw.name = b"dummy"
1893 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001894 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_line_buffering(self):
1897 r = self.BytesIO()
1898 b = self.BufferedWriter(r, 1000)
1899 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001900 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001902 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001903 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001904 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001905 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001906
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001907 # Issue 15989
1908 def test_device_encoding(self):
1909 b = self.BytesIO()
1910 b.fileno = lambda: _testcapi.INT_MAX + 1
1911 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1912 b.fileno = lambda: _testcapi.UINT_MAX + 1
1913 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001915 def test_encoding(self):
1916 # Check the encoding attribute is always set, and valid
1917 b = self.BytesIO()
1918 t = self.TextIOWrapper(b, encoding="utf8")
1919 self.assertEqual(t.encoding, "utf8")
1920 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001921 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 codecs.lookup(t.encoding)
1923
1924 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001925 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 b = self.BytesIO(b"abc\n\xff\n")
1927 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001928 self.assertRaises(UnicodeError, t.read)
1929 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 b = self.BytesIO(b"abc\n\xff\n")
1931 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001932 self.assertRaises(UnicodeError, t.read)
1933 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 b = self.BytesIO(b"abc\n\xff\n")
1935 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001936 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001937 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 b = self.BytesIO(b"abc\n\xff\n")
1939 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001940 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001942 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001943 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 b = self.BytesIO()
1945 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001946 self.assertRaises(UnicodeError, t.write, "\xff")
1947 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 b = self.BytesIO()
1949 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001950 self.assertRaises(UnicodeError, t.write, "\xff")
1951 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 b = self.BytesIO()
1953 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001954 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001955 t.write("abc\xffdef\n")
1956 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001957 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001958 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 b = self.BytesIO()
1960 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001961 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001962 t.write("abc\xffdef\n")
1963 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001964 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001967 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1968
1969 tests = [
1970 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001971 [ '', input_lines ],
1972 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1973 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1974 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001975 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001976 encodings = (
1977 'utf-8', 'latin-1',
1978 'utf-16', 'utf-16-le', 'utf-16-be',
1979 'utf-32', 'utf-32-le', 'utf-32-be',
1980 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001981
Guido van Rossum8358db22007-08-18 21:39:55 +00001982 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001983 # character in TextIOWrapper._pending_line.
1984 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001985 # XXX: str.encode() should return bytes
1986 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001987 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001988 for bufsize in range(1, 10):
1989 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1991 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001992 encoding=encoding)
1993 if do_reads:
1994 got_lines = []
1995 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001996 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001997 if c2 == '':
1998 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001999 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002000 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002001 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002002 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002003
2004 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002005 self.assertEqual(got_line, exp_line)
2006 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_newlines_input(self):
2009 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002010 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2011 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002012 (None, normalized.decode("ascii").splitlines(True)),
2013 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2015 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2016 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002017 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 buf = self.BytesIO(testdata)
2019 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002021 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 def test_newlines_output(self):
2025 testdict = {
2026 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2027 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2028 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2029 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2030 }
2031 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2032 for newline, expected in tests:
2033 buf = self.BytesIO()
2034 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2035 txt.write("AAA\nB")
2036 txt.write("BB\nCCC\n")
2037 txt.write("X\rY\r\nZ")
2038 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002039 self.assertEqual(buf.closed, False)
2040 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041
2042 def test_destructor(self):
2043 l = []
2044 base = self.BytesIO
2045 class MyBytesIO(base):
2046 def close(self):
2047 l.append(self.getvalue())
2048 base.close(self)
2049 b = MyBytesIO()
2050 t = self.TextIOWrapper(b, encoding="ascii")
2051 t.write("abc")
2052 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002053 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002054 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055
2056 def test_override_destructor(self):
2057 record = []
2058 class MyTextIO(self.TextIOWrapper):
2059 def __del__(self):
2060 record.append(1)
2061 try:
2062 f = super().__del__
2063 except AttributeError:
2064 pass
2065 else:
2066 f()
2067 def close(self):
2068 record.append(2)
2069 super().close()
2070 def flush(self):
2071 record.append(3)
2072 super().flush()
2073 b = self.BytesIO()
2074 t = MyTextIO(b, encoding="ascii")
2075 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002076 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077 self.assertEqual(record, [1, 2, 3])
2078
2079 def test_error_through_destructor(self):
2080 # Test that the exception state is not modified by a destructor,
2081 # even if close() fails.
2082 rawio = self.CloseFailureIO()
2083 def f():
2084 self.TextIOWrapper(rawio).xyzzy
2085 with support.captured_output("stderr") as s:
2086 self.assertRaises(AttributeError, f)
2087 s = s.getvalue().strip()
2088 if s:
2089 # The destructor *may* have printed an unraisable error, check it
2090 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002091 self.assertTrue(s.startswith("Exception IOError: "), s)
2092 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002093
Guido van Rossum9b76da62007-04-11 01:09:03 +00002094 # Systematic tests of the text I/O API
2095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002096 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002097 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2098 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002100 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002101 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002102 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002104 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(f.tell(), 0)
2106 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002107 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(f.seek(0), 0)
2109 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002110 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002111 self.assertEqual(f.read(2), "ab")
2112 self.assertEqual(f.read(1), "c")
2113 self.assertEqual(f.read(1), "")
2114 self.assertEqual(f.read(), "")
2115 self.assertEqual(f.tell(), cookie)
2116 self.assertEqual(f.seek(0), 0)
2117 self.assertEqual(f.seek(0, 2), cookie)
2118 self.assertEqual(f.write("def"), 3)
2119 self.assertEqual(f.seek(cookie), cookie)
2120 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002121 if enc.startswith("utf"):
2122 self.multi_line_test(f, enc)
2123 f.close()
2124
2125 def multi_line_test(self, f, enc):
2126 f.seek(0)
2127 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002128 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002129 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002130 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002131 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002132 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002133 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002134 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002135 wlines.append((f.tell(), line))
2136 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002137 f.seek(0)
2138 rlines = []
2139 while True:
2140 pos = f.tell()
2141 line = f.readline()
2142 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002143 break
2144 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 def test_telling(self):
2148 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002149 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002150 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002151 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002152 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002153 p2 = f.tell()
2154 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002155 self.assertEqual(f.tell(), p0)
2156 self.assertEqual(f.readline(), "\xff\n")
2157 self.assertEqual(f.tell(), p1)
2158 self.assertEqual(f.readline(), "\xff\n")
2159 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002160 f.seek(0)
2161 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002162 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002163 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002164 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002165 f.close()
2166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 def test_seeking(self):
2168 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002169 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002170 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002171 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002173 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002174 suffix = bytes(u_suffix.encode("utf-8"))
2175 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002176 with self.open(support.TESTFN, "wb") as f:
2177 f.write(line*2)
2178 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2179 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002180 self.assertEqual(s, str(prefix, "ascii"))
2181 self.assertEqual(f.tell(), prefix_size)
2182 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002185 # Regression test for a specific bug
2186 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002187 with self.open(support.TESTFN, "wb") as f:
2188 f.write(data)
2189 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2190 f._CHUNK_SIZE # Just test that it exists
2191 f._CHUNK_SIZE = 2
2192 f.readline()
2193 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 def test_seek_and_tell(self):
2196 #Test seek/tell using the StatefulIncrementalDecoder.
2197 # Make test faster by doing smaller seeks
2198 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002199
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002200 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002201 """Tell/seek to various points within a data stream and ensure
2202 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002204 f.write(data)
2205 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 f = self.open(support.TESTFN, encoding='test_decoder')
2207 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002208 decoded = f.read()
2209 f.close()
2210
Neal Norwitze2b07052008-03-18 19:52:05 +00002211 for i in range(min_pos, len(decoded) + 1): # seek positions
2212 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002213 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002215 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002216 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002217 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002219 f.close()
2220
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002221 # Enable the test decoder.
2222 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002223
2224 # Run the tests.
2225 try:
2226 # Try each test case.
2227 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002228 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002229
2230 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002231 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2232 offset = CHUNK_SIZE - len(input)//2
2233 prefix = b'.'*offset
2234 # Don't bother seeking into the prefix (takes too long).
2235 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002236 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002237
2238 # Ensure our test decoder won't interfere with subsequent tests.
2239 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002240 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002243 data = "1234567890"
2244 tests = ("utf-16",
2245 "utf-16-le",
2246 "utf-16-be",
2247 "utf-32",
2248 "utf-32-le",
2249 "utf-32-be")
2250 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 buf = self.BytesIO()
2252 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002253 # Check if the BOM is written only once (see issue1753).
2254 f.write(data)
2255 f.write(data)
2256 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002258 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002259 self.assertEqual(f.read(), data * 2)
2260 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002261
Benjamin Petersona1b49012009-03-31 23:11:32 +00002262 def test_unreadable(self):
2263 class UnReadable(self.BytesIO):
2264 def readable(self):
2265 return False
2266 txt = self.TextIOWrapper(UnReadable())
2267 self.assertRaises(IOError, txt.read)
2268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 def test_read_one_by_one(self):
2270 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002271 reads = ""
2272 while True:
2273 c = txt.read(1)
2274 if not c:
2275 break
2276 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002278
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002279 def test_readlines(self):
2280 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2281 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2282 txt.seek(0)
2283 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2284 txt.seek(0)
2285 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2286
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002291 reads = ""
2292 while True:
2293 c = txt.read(128)
2294 if not c:
2295 break
2296 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002297 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002298
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002299 def test_writelines(self):
2300 l = ['ab', 'cd', 'ef']
2301 buf = self.BytesIO()
2302 txt = self.TextIOWrapper(buf)
2303 txt.writelines(l)
2304 txt.flush()
2305 self.assertEqual(buf.getvalue(), b'abcdef')
2306
2307 def test_writelines_userlist(self):
2308 l = UserList(['ab', 'cd', 'ef'])
2309 buf = self.BytesIO()
2310 txt = self.TextIOWrapper(buf)
2311 txt.writelines(l)
2312 txt.flush()
2313 self.assertEqual(buf.getvalue(), b'abcdef')
2314
2315 def test_writelines_error(self):
2316 txt = self.TextIOWrapper(self.BytesIO())
2317 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2318 self.assertRaises(TypeError, txt.writelines, None)
2319 self.assertRaises(TypeError, txt.writelines, b'abc')
2320
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002321 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002323
2324 # read one char at a time
2325 reads = ""
2326 while True:
2327 c = txt.read(1)
2328 if not c:
2329 break
2330 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002331 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002332
2333 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002334 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002335 txt._CHUNK_SIZE = 4
2336
2337 reads = ""
2338 while True:
2339 c = txt.read(4)
2340 if not c:
2341 break
2342 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002344
2345 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002346 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002347 txt._CHUNK_SIZE = 4
2348
2349 reads = txt.read(4)
2350 reads += txt.read(4)
2351 reads += txt.readline()
2352 reads += txt.readline()
2353 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002354 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002355
2356 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002357 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002358 txt._CHUNK_SIZE = 4
2359
2360 reads = txt.read(4)
2361 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002363
2364 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002366 txt._CHUNK_SIZE = 4
2367
2368 reads = txt.read(4)
2369 pos = txt.tell()
2370 txt.seek(0)
2371 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002372 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002373
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002374 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 buffer = self.BytesIO(self.testdata)
2376 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002377
2378 self.assertEqual(buffer.seekable(), txt.seekable())
2379
Antoine Pitroue4501852009-05-14 18:55:55 +00002380 def test_append_bom(self):
2381 # The BOM is not written again when appending to a non-empty file
2382 filename = support.TESTFN
2383 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2384 with self.open(filename, 'w', encoding=charset) as f:
2385 f.write('aaa')
2386 pos = f.tell()
2387 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002388 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002389
2390 with self.open(filename, 'a', encoding=charset) as f:
2391 f.write('xxx')
2392 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002394
2395 def test_seek_bom(self):
2396 # Same test, but when seeking manually
2397 filename = support.TESTFN
2398 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2399 with self.open(filename, 'w', encoding=charset) as f:
2400 f.write('aaa')
2401 pos = f.tell()
2402 with self.open(filename, 'r+', encoding=charset) as f:
2403 f.seek(pos)
2404 f.write('zzz')
2405 f.seek(0)
2406 f.write('bbb')
2407 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002408 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002409
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002410 def test_errors_property(self):
2411 with self.open(support.TESTFN, "w") as f:
2412 self.assertEqual(f.errors, "strict")
2413 with self.open(support.TESTFN, "w", errors="replace") as f:
2414 self.assertEqual(f.errors, "replace")
2415
Victor Stinner45df8202010-04-28 22:31:17 +00002416 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002417 def test_threads_write(self):
2418 # Issue6750: concurrent writes could duplicate data
2419 event = threading.Event()
2420 with self.open(support.TESTFN, "w", buffering=1) as f:
2421 def run(n):
2422 text = "Thread%03d\n" % n
2423 event.wait()
2424 f.write(text)
2425 threads = [threading.Thread(target=lambda n=x: run(n))
2426 for x in range(20)]
2427 for t in threads:
2428 t.start()
2429 time.sleep(0.02)
2430 event.set()
2431 for t in threads:
2432 t.join()
2433 with self.open(support.TESTFN) as f:
2434 content = f.read()
2435 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002436 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002437
Antoine Pitrou6be88762010-05-03 16:48:20 +00002438 def test_flush_error_on_close(self):
2439 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2440 def bad_flush():
2441 raise IOError()
2442 txt.flush = bad_flush
2443 self.assertRaises(IOError, txt.close) # exception not swallowed
2444
2445 def test_multi_close(self):
2446 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2447 txt.close()
2448 txt.close()
2449 txt.close()
2450 self.assertRaises(ValueError, txt.flush)
2451
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002452 def test_unseekable(self):
2453 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2454 self.assertRaises(self.UnsupportedOperation, txt.tell)
2455 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2456
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002457 def test_readonly_attributes(self):
2458 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2459 buf = self.BytesIO(self.testdata)
2460 with self.assertRaises(AttributeError):
2461 txt.buffer = buf
2462
Antoine Pitroue96ec682011-07-23 21:46:35 +02002463 def test_rawio(self):
2464 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2465 # that subprocess.Popen() can have the required unbuffered
2466 # semantics with universal_newlines=True.
2467 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2468 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2469 # Reads
2470 self.assertEqual(txt.read(4), 'abcd')
2471 self.assertEqual(txt.readline(), 'efghi\n')
2472 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2473
2474 def test_rawio_write_through(self):
2475 # Issue #12591: with write_through=True, writes don't need a flush
2476 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2477 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2478 write_through=True)
2479 txt.write('1')
2480 txt.write('23\n4')
2481 txt.write('5')
2482 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2483
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002484class CTextIOWrapperTest(TextIOWrapperTest):
2485
2486 def test_initialization(self):
2487 r = self.BytesIO(b"\xc3\xa9\n\n")
2488 b = self.BufferedReader(r, 1000)
2489 t = self.TextIOWrapper(b)
2490 self.assertRaises(TypeError, t.__init__, b, newline=42)
2491 self.assertRaises(ValueError, t.read)
2492 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2493 self.assertRaises(ValueError, t.read)
2494
2495 def test_garbage_collection(self):
2496 # C TextIOWrapper objects are collected, and collecting them flushes
2497 # all data to disk.
2498 # The Python version has __del__, so it ends in gc.garbage instead.
2499 rawio = io.FileIO(support.TESTFN, "wb")
2500 b = self.BufferedWriter(rawio)
2501 t = self.TextIOWrapper(b, encoding="ascii")
2502 t.write("456def")
2503 t.x = t
2504 wr = weakref.ref(t)
2505 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002506 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002507 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002508 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 self.assertEqual(f.read(), b"456def")
2510
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002511 def test_rwpair_cleared_before_textio(self):
2512 # Issue 13070: TextIOWrapper's finalization would crash when called
2513 # after the reference to the underlying BufferedRWPair's writer got
2514 # cleared by the GC.
2515 for i in range(1000):
2516 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2517 t1 = self.TextIOWrapper(b1, encoding="ascii")
2518 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2519 t2 = self.TextIOWrapper(b2, encoding="ascii")
2520 # circular references
2521 t1.buddy = t2
2522 t2.buddy = t1
2523 support.gc_collect()
2524
2525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526class PyTextIOWrapperTest(TextIOWrapperTest):
2527 pass
2528
2529
2530class IncrementalNewlineDecoderTest(unittest.TestCase):
2531
2532 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002533 # UTF-8 specific tests for a newline decoder
2534 def _check_decode(b, s, **kwargs):
2535 # We exercise getstate() / setstate() as well as decode()
2536 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002537 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002538 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002540
Antoine Pitrou180a3362008-12-14 16:36:46 +00002541 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002542
Antoine Pitrou180a3362008-12-14 16:36:46 +00002543 _check_decode(b'\xe8', "")
2544 _check_decode(b'\xa2', "")
2545 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002546
Antoine Pitrou180a3362008-12-14 16:36:46 +00002547 _check_decode(b'\xe8', "")
2548 _check_decode(b'\xa2', "")
2549 _check_decode(b'\x88', "\u8888")
2550
2551 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002552 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2553
Antoine Pitrou180a3362008-12-14 16:36:46 +00002554 decoder.reset()
2555 _check_decode(b'\n', "\n")
2556 _check_decode(b'\r', "")
2557 _check_decode(b'', "\n", final=True)
2558 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002559
Antoine Pitrou180a3362008-12-14 16:36:46 +00002560 _check_decode(b'\r', "")
2561 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002562
Antoine Pitrou180a3362008-12-14 16:36:46 +00002563 _check_decode(b'\r\r\n', "\n\n")
2564 _check_decode(b'\r', "")
2565 _check_decode(b'\r', "\n")
2566 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002567
Antoine Pitrou180a3362008-12-14 16:36:46 +00002568 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2569 _check_decode(b'\xe8\xa2\x88', "\u8888")
2570 _check_decode(b'\n', "\n")
2571 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2572 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002573
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002575 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002576 if encoding is not None:
2577 encoder = codecs.getincrementalencoder(encoding)()
2578 def _decode_bytewise(s):
2579 # Decode one byte at a time
2580 for b in encoder.encode(s):
2581 result.append(decoder.decode(bytes([b])))
2582 else:
2583 encoder = None
2584 def _decode_bytewise(s):
2585 # Decode one char at a time
2586 for c in s:
2587 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002588 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002589 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002590 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002591 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002592 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002593 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002594 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002595 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002596 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002597 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002599 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 input = "abc"
2601 if encoder is not None:
2602 encoder.reset()
2603 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002604 self.assertEqual(decoder.decode(input), "abc")
2605 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002606
2607 def test_newline_decoder(self):
2608 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002609 # None meaning the IncrementalNewlineDecoder takes unicode input
2610 # rather than bytes input
2611 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002612 'utf-16', 'utf-16-le', 'utf-16-be',
2613 'utf-32', 'utf-32-le', 'utf-32-be',
2614 )
2615 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 decoder = enc and codecs.getincrementaldecoder(enc)()
2617 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2618 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002619 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002620 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2621 self.check_newline_decoding_utf8(decoder)
2622
Antoine Pitrou66913e22009-03-06 23:40:56 +00002623 def test_newline_bytes(self):
2624 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2625 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(dec.newlines, None)
2627 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2628 self.assertEqual(dec.newlines, None)
2629 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2630 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002631 dec = self.IncrementalNewlineDecoder(None, translate=False)
2632 _check(dec)
2633 dec = self.IncrementalNewlineDecoder(None, translate=True)
2634 _check(dec)
2635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2637 pass
2638
2639class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2640 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002641
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002642
Guido van Rossum01a27522007-03-07 01:00:12 +00002643# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002644
Guido van Rossum5abbf752007-08-27 17:39:33 +00002645class MiscIOTest(unittest.TestCase):
2646
Barry Warsaw40e82462008-11-20 20:14:50 +00002647 def tearDown(self):
2648 support.unlink(support.TESTFN)
2649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 def test___all__(self):
2651 for name in self.io.__all__:
2652 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002653 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002654 if name == "open":
2655 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002656 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002657 self.assertTrue(issubclass(obj, Exception), name)
2658 elif not name.startswith("SEEK_"):
2659 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002660
Barry Warsaw40e82462008-11-20 20:14:50 +00002661 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002663 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002664 f.close()
2665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002667 self.assertEqual(f.name, support.TESTFN)
2668 self.assertEqual(f.buffer.name, support.TESTFN)
2669 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2670 self.assertEqual(f.mode, "U")
2671 self.assertEqual(f.buffer.mode, "rb")
2672 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002673 f.close()
2674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002676 self.assertEqual(f.mode, "w+")
2677 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2678 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(g.mode, "wb")
2682 self.assertEqual(g.raw.mode, "wb")
2683 self.assertEqual(g.name, f.fileno())
2684 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002685 f.close()
2686 g.close()
2687
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002688 def test_io_after_close(self):
2689 for kwargs in [
2690 {"mode": "w"},
2691 {"mode": "wb"},
2692 {"mode": "w", "buffering": 1},
2693 {"mode": "w", "buffering": 2},
2694 {"mode": "wb", "buffering": 0},
2695 {"mode": "r"},
2696 {"mode": "rb"},
2697 {"mode": "r", "buffering": 1},
2698 {"mode": "r", "buffering": 2},
2699 {"mode": "rb", "buffering": 0},
2700 {"mode": "w+"},
2701 {"mode": "w+b"},
2702 {"mode": "w+", "buffering": 1},
2703 {"mode": "w+", "buffering": 2},
2704 {"mode": "w+b", "buffering": 0},
2705 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002706 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002707 f.close()
2708 self.assertRaises(ValueError, f.flush)
2709 self.assertRaises(ValueError, f.fileno)
2710 self.assertRaises(ValueError, f.isatty)
2711 self.assertRaises(ValueError, f.__iter__)
2712 if hasattr(f, "peek"):
2713 self.assertRaises(ValueError, f.peek, 1)
2714 self.assertRaises(ValueError, f.read)
2715 if hasattr(f, "read1"):
2716 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002717 if hasattr(f, "readall"):
2718 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002719 if hasattr(f, "readinto"):
2720 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2721 self.assertRaises(ValueError, f.readline)
2722 self.assertRaises(ValueError, f.readlines)
2723 self.assertRaises(ValueError, f.seek, 0)
2724 self.assertRaises(ValueError, f.tell)
2725 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002726 self.assertRaises(ValueError, f.write,
2727 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002728 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 def test_blockingioerror(self):
2732 # Various BlockingIOError issues
2733 self.assertRaises(TypeError, self.BlockingIOError)
2734 self.assertRaises(TypeError, self.BlockingIOError, 1)
2735 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2736 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2737 b = self.BlockingIOError(1, "")
2738 self.assertEqual(b.characters_written, 0)
2739 class C(str):
2740 pass
2741 c = C("")
2742 b = self.BlockingIOError(1, c)
2743 c.b = b
2744 b.c = c
2745 wr = weakref.ref(c)
2746 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002747 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002748 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749
2750 def test_abcs(self):
2751 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002752 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2753 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2754 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2755 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756
2757 def _check_abc_inheritance(self, abcmodule):
2758 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002759 self.assertIsInstance(f, abcmodule.IOBase)
2760 self.assertIsInstance(f, abcmodule.RawIOBase)
2761 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2762 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002763 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002764 self.assertIsInstance(f, abcmodule.IOBase)
2765 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2766 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2767 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002769 self.assertIsInstance(f, abcmodule.IOBase)
2770 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2771 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2772 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773
2774 def test_abc_inheritance(self):
2775 # Test implementations inherit from their respective ABCs
2776 self._check_abc_inheritance(self)
2777
2778 def test_abc_inheritance_official(self):
2779 # Test implementations inherit from the official ABCs of the
2780 # baseline "io" module.
2781 self._check_abc_inheritance(io)
2782
Antoine Pitroue033e062010-10-29 10:38:18 +00002783 def _check_warn_on_dealloc(self, *args, **kwargs):
2784 f = open(*args, **kwargs)
2785 r = repr(f)
2786 with self.assertWarns(ResourceWarning) as cm:
2787 f = None
2788 support.gc_collect()
2789 self.assertIn(r, str(cm.warning.args[0]))
2790
2791 def test_warn_on_dealloc(self):
2792 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2793 self._check_warn_on_dealloc(support.TESTFN, "wb")
2794 self._check_warn_on_dealloc(support.TESTFN, "w")
2795
2796 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2797 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002798 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002799 for fd in fds:
2800 try:
2801 os.close(fd)
2802 except EnvironmentError as e:
2803 if e.errno != errno.EBADF:
2804 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002805 self.addCleanup(cleanup_fds)
2806 r, w = os.pipe()
2807 fds += r, w
2808 self._check_warn_on_dealloc(r, *args, **kwargs)
2809 # When using closefd=False, there's no warning
2810 r, w = os.pipe()
2811 fds += r, w
2812 with warnings.catch_warnings(record=True) as recorded:
2813 open(r, *args, closefd=False, **kwargs)
2814 support.gc_collect()
2815 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002816
2817 def test_warn_on_dealloc_fd(self):
2818 self._check_warn_on_dealloc_fd("rb", buffering=0)
2819 self._check_warn_on_dealloc_fd("rb")
2820 self._check_warn_on_dealloc_fd("r")
2821
2822
Antoine Pitrou243757e2010-11-05 21:15:39 +00002823 def test_pickling(self):
2824 # Pickling file objects is forbidden
2825 for kwargs in [
2826 {"mode": "w"},
2827 {"mode": "wb"},
2828 {"mode": "wb", "buffering": 0},
2829 {"mode": "r"},
2830 {"mode": "rb"},
2831 {"mode": "rb", "buffering": 0},
2832 {"mode": "w+"},
2833 {"mode": "w+b"},
2834 {"mode": "w+b", "buffering": 0},
2835 ]:
2836 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2837 with self.open(support.TESTFN, **kwargs) as f:
2838 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2839
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002840 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2841 def test_nonblock_pipe_write_bigbuf(self):
2842 self._test_nonblock_pipe_write(16*1024)
2843
2844 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2845 def test_nonblock_pipe_write_smallbuf(self):
2846 self._test_nonblock_pipe_write(1024)
2847
2848 def _set_non_blocking(self, fd):
2849 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2850 self.assertNotEqual(flags, -1)
2851 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2852 self.assertEqual(res, 0)
2853
2854 def _test_nonblock_pipe_write(self, bufsize):
2855 sent = []
2856 received = []
2857 r, w = os.pipe()
2858 self._set_non_blocking(r)
2859 self._set_non_blocking(w)
2860
2861 # To exercise all code paths in the C implementation we need
2862 # to play with buffer sizes. For instance, if we choose a
2863 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2864 # then we will never get a partial write of the buffer.
2865 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2866 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2867
2868 with rf, wf:
2869 for N in 9999, 73, 7574:
2870 try:
2871 i = 0
2872 while True:
2873 msg = bytes([i % 26 + 97]) * N
2874 sent.append(msg)
2875 wf.write(msg)
2876 i += 1
2877
2878 except self.BlockingIOError as e:
2879 self.assertEqual(e.args[0], errno.EAGAIN)
2880 sent[-1] = sent[-1][:e.characters_written]
2881 received.append(rf.read())
2882 msg = b'BLOCKED'
2883 wf.write(msg)
2884 sent.append(msg)
2885
2886 while True:
2887 try:
2888 wf.flush()
2889 break
2890 except self.BlockingIOError as e:
2891 self.assertEqual(e.args[0], errno.EAGAIN)
2892 self.assertEqual(e.characters_written, 0)
2893 received.append(rf.read())
2894
2895 received += iter(rf.read, None)
2896
2897 sent, received = b''.join(sent), b''.join(received)
2898 self.assertTrue(sent == received)
2899 self.assertTrue(wf.closed)
2900 self.assertTrue(rf.closed)
2901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002902class CMiscIOTest(MiscIOTest):
2903 io = io
2904
2905class PyMiscIOTest(MiscIOTest):
2906 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002907
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002908
2909@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2910class SignalsTest(unittest.TestCase):
2911
2912 def setUp(self):
2913 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2914
2915 def tearDown(self):
2916 signal.signal(signal.SIGALRM, self.oldalrm)
2917
2918 def alarm_interrupt(self, sig, frame):
2919 1/0
2920
2921 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002922 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2923 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002924 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2925 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002926 invokes the signal handler, and bubbles up the exception raised
2927 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002928 read_results = []
2929 def _read():
2930 s = os.read(r, 1)
2931 read_results.append(s)
2932 t = threading.Thread(target=_read)
2933 t.daemon = True
2934 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002935 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002936 try:
2937 wio = self.io.open(w, **fdopen_kwargs)
2938 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002939 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002940 # Fill the pipe enough that the write will be blocking.
2941 # It will be interrupted by the timer armed above. Since the
2942 # other thread has read one byte, the low-level write will
2943 # return with a successful (partial) result rather than an EINTR.
2944 # The buffered IO layer must check for pending signal
2945 # handlers, which in this case will invoke alarm_interrupt().
2946 self.assertRaises(ZeroDivisionError,
2947 wio.write, item * (1024 * 1024))
2948 t.join()
2949 # We got one byte, get another one and check that it isn't a
2950 # repeat of the first one.
2951 read_results.append(os.read(r, 1))
2952 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2953 finally:
2954 os.close(w)
2955 os.close(r)
2956 # This is deliberate. If we didn't close the file descriptor
2957 # before closing wio, wio would try to flush its internal
2958 # buffer, and block again.
2959 try:
2960 wio.close()
2961 except IOError as e:
2962 if e.errno != errno.EBADF:
2963 raise
2964
2965 def test_interrupted_write_unbuffered(self):
2966 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2967
2968 def test_interrupted_write_buffered(self):
2969 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2970
2971 def test_interrupted_write_text(self):
2972 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2973
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002974 def check_reentrant_write(self, data, **fdopen_kwargs):
2975 def on_alarm(*args):
2976 # Will be called reentrantly from the same thread
2977 wio.write(data)
2978 1/0
2979 signal.signal(signal.SIGALRM, on_alarm)
2980 r, w = os.pipe()
2981 wio = self.io.open(w, **fdopen_kwargs)
2982 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002983 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002984 # Either the reentrant call to wio.write() fails with RuntimeError,
2985 # or the signal handler raises ZeroDivisionError.
2986 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2987 while 1:
2988 for i in range(100):
2989 wio.write(data)
2990 wio.flush()
2991 # Make sure the buffer doesn't fill up and block further writes
2992 os.read(r, len(data) * 100)
2993 exc = cm.exception
2994 if isinstance(exc, RuntimeError):
2995 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2996 finally:
2997 wio.close()
2998 os.close(r)
2999
3000 def test_reentrant_write_buffered(self):
3001 self.check_reentrant_write(b"xy", mode="wb")
3002
3003 def test_reentrant_write_text(self):
3004 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3005
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003006 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3007 """Check that a buffered read, when it gets interrupted (either
3008 returning a partial result or EINTR), properly invokes the signal
3009 handler and retries if the latter returned successfully."""
3010 r, w = os.pipe()
3011 fdopen_kwargs["closefd"] = False
3012 def alarm_handler(sig, frame):
3013 os.write(w, b"bar")
3014 signal.signal(signal.SIGALRM, alarm_handler)
3015 try:
3016 rio = self.io.open(r, **fdopen_kwargs)
3017 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003018 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003019 # Expected behaviour:
3020 # - first raw read() returns partial b"foo"
3021 # - second raw read() returns EINTR
3022 # - third raw read() returns b"bar"
3023 self.assertEqual(decode(rio.read(6)), "foobar")
3024 finally:
3025 rio.close()
3026 os.close(w)
3027 os.close(r)
3028
Antoine Pitrou20db5112011-08-19 20:32:34 +02003029 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003030 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3031 mode="rb")
3032
Antoine Pitrou20db5112011-08-19 20:32:34 +02003033 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003034 self.check_interrupted_read_retry(lambda x: x,
3035 mode="r")
3036
3037 @unittest.skipUnless(threading, 'Threading required for this test.')
3038 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3039 """Check that a buffered write, when it gets interrupted (either
3040 returning a partial result or EINTR), properly invokes the signal
3041 handler and retries if the latter returned successfully."""
3042 select = support.import_module("select")
3043 # A quantity that exceeds the buffer size of an anonymous pipe's
3044 # write end.
3045 N = 1024 * 1024
3046 r, w = os.pipe()
3047 fdopen_kwargs["closefd"] = False
3048 # We need a separate thread to read from the pipe and allow the
3049 # write() to finish. This thread is started after the SIGALRM is
3050 # received (forcing a first EINTR in write()).
3051 read_results = []
3052 write_finished = False
3053 def _read():
3054 while not write_finished:
3055 while r in select.select([r], [], [], 1.0)[0]:
3056 s = os.read(r, 1024)
3057 read_results.append(s)
3058 t = threading.Thread(target=_read)
3059 t.daemon = True
3060 def alarm1(sig, frame):
3061 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003062 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003063 def alarm2(sig, frame):
3064 t.start()
3065 signal.signal(signal.SIGALRM, alarm1)
3066 try:
3067 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003068 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003069 # Expected behaviour:
3070 # - first raw write() is partial (because of the limited pipe buffer
3071 # and the first alarm)
3072 # - second raw write() returns EINTR (because of the second alarm)
3073 # - subsequent write()s are successful (either partial or complete)
3074 self.assertEqual(N, wio.write(item * N))
3075 wio.flush()
3076 write_finished = True
3077 t.join()
3078 self.assertEqual(N, sum(len(x) for x in read_results))
3079 finally:
3080 write_finished = True
3081 os.close(w)
3082 os.close(r)
3083 # This is deliberate. If we didn't close the file descriptor
3084 # before closing wio, wio would try to flush its internal
3085 # buffer, and could block (in case of failure).
3086 try:
3087 wio.close()
3088 except IOError as e:
3089 if e.errno != errno.EBADF:
3090 raise
3091
Antoine Pitrou20db5112011-08-19 20:32:34 +02003092 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003093 self.check_interrupted_write_retry(b"x", mode="wb")
3094
Antoine Pitrou20db5112011-08-19 20:32:34 +02003095 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003096 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3097
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003098
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003099class CSignalsTest(SignalsTest):
3100 io = io
3101
3102class PySignalsTest(SignalsTest):
3103 io = pyio
3104
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003105 # Handling reentrancy issues would slow down _pyio even more, so the
3106 # tests are disabled.
3107 test_reentrant_write_buffered = None
3108 test_reentrant_write_text = None
3109
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003110
Guido van Rossum28524c72007-02-27 05:47:44 +00003111def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003112 tests = (CIOTest, PyIOTest,
3113 CBufferedReaderTest, PyBufferedReaderTest,
3114 CBufferedWriterTest, PyBufferedWriterTest,
3115 CBufferedRWPairTest, PyBufferedRWPairTest,
3116 CBufferedRandomTest, PyBufferedRandomTest,
3117 StatefulIncrementalDecoderTest,
3118 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3119 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003120 CMiscIOTest, PyMiscIOTest,
3121 CSignalsTest, PySignalsTest,
3122 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003123
3124 # Put the namespaces of the IO module we are testing and some useful mock
3125 # classes in the __dict__ of each test.
3126 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003127 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003128 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3129 c_io_ns = {name : getattr(io, name) for name in all_members}
3130 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3131 globs = globals()
3132 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3133 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3134 # Avoid turning open into a bound method.
3135 py_io_ns["open"] = pyio.OpenWrapper
3136 for test in tests:
3137 if test.__name__.startswith("C"):
3138 for name, obj in c_io_ns.items():
3139 setattr(test, name, obj)
3140 elif test.__name__.startswith("Py"):
3141 for name, obj in py_io_ns.items():
3142 setattr(test, name, obj)
3143
3144 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003145
3146if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003147 test_main()