blob: 56ac2c8a792b18d17602040ed105e23bb596a43a [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200637 def test_fileio_closefd(self):
638 # Issue #4841
639 with self.open(__file__, 'rb') as f1, \
640 self.open(__file__, 'rb') as f2:
641 fileio = self.FileIO(f1.fileno(), closefd=False)
642 # .__init__() must not close f1
643 fileio.__init__(f2.fileno(), closefd=False)
644 f1.readline()
645 # .close() must not close f2
646 fileio.close()
647 f2.readline()
648
649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200651
652 def test_IOBase_finalize(self):
653 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
654 # class which inherits IOBase and an object of this class are caught
655 # in a reference cycle and close() is already in the method cache.
656 class MyIO(self.IOBase):
657 def close(self):
658 pass
659
660 # create an instance to populate the method cache
661 MyIO()
662 obj = MyIO()
663 obj.obj = obj
664 wr = weakref.ref(obj)
665 del MyIO
666 del obj
667 support.gc_collect()
668 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670class PyIOTest(IOTest):
671 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000672
Guido van Rossuma9e20242007-03-08 00:43:48 +0000673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674class CommonBufferedTests:
675 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
676
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000677 def test_detach(self):
678 raw = self.MockRawIO()
679 buf = self.tp(raw)
680 self.assertIs(buf.detach(), raw)
681 self.assertRaises(ValueError, buf.detach)
682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 def test_fileno(self):
684 rawio = self.MockRawIO()
685 bufio = self.tp(rawio)
686
Ezio Melottib3aedd42010-11-20 19:04:17 +0000687 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000688
689 def test_no_fileno(self):
690 # XXX will we always have fileno() function? If so, kill
691 # this test. Else, write it.
692 pass
693
694 def test_invalid_args(self):
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 # Invalid whence
698 self.assertRaises(ValueError, bufio.seek, 0, -1)
699 self.assertRaises(ValueError, bufio.seek, 0, 3)
700
701 def test_override_destructor(self):
702 tp = self.tp
703 record = []
704 class MyBufferedIO(tp):
705 def __del__(self):
706 record.append(1)
707 try:
708 f = super().__del__
709 except AttributeError:
710 pass
711 else:
712 f()
713 def close(self):
714 record.append(2)
715 super().close()
716 def flush(self):
717 record.append(3)
718 super().flush()
719 rawio = self.MockRawIO()
720 bufio = MyBufferedIO(rawio)
721 writable = bufio.writable()
722 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000723 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 if writable:
725 self.assertEqual(record, [1, 2, 3])
726 else:
727 self.assertEqual(record, [1, 2])
728
729 def test_context_manager(self):
730 # Test usability as a context manager
731 rawio = self.MockRawIO()
732 bufio = self.tp(rawio)
733 def _with():
734 with bufio:
735 pass
736 _with()
737 # bufio should now be closed, and using it a second time should raise
738 # a ValueError.
739 self.assertRaises(ValueError, _with)
740
741 def test_error_through_destructor(self):
742 # Test that the exception state is not modified by a destructor,
743 # even if close() fails.
744 rawio = self.CloseFailureIO()
745 def f():
746 self.tp(rawio).xyzzy
747 with support.captured_output("stderr") as s:
748 self.assertRaises(AttributeError, f)
749 s = s.getvalue().strip()
750 if s:
751 # The destructor *may* have printed an unraisable error, check it
752 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000753 self.assertTrue(s.startswith("Exception IOError: "), s)
754 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000755
Antoine Pitrou716c4442009-05-23 19:04:03 +0000756 def test_repr(self):
757 raw = self.MockRawIO()
758 b = self.tp(raw)
759 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
760 self.assertEqual(repr(b), "<%s>" % clsname)
761 raw.name = "dummy"
762 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
763 raw.name = b"dummy"
764 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
765
Antoine Pitrou6be88762010-05-03 16:48:20 +0000766 def test_flush_error_on_close(self):
767 raw = self.MockRawIO()
768 def bad_flush():
769 raise IOError()
770 raw.flush = bad_flush
771 b = self.tp(raw)
772 self.assertRaises(IOError, b.close) # exception not swallowed
773
774 def test_multi_close(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
777 b.close()
778 b.close()
779 b.close()
780 self.assertRaises(ValueError, b.flush)
781
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000782 def test_unseekable(self):
783 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
784 self.assertRaises(self.UnsupportedOperation, bufio.tell)
785 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
786
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000787 def test_readonly_attributes(self):
788 raw = self.MockRawIO()
789 buf = self.tp(raw)
790 x = self.MockRawIO()
791 with self.assertRaises(AttributeError):
792 buf.raw = x
793
Guido van Rossum78892e42007-04-06 17:31:18 +0000794
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200795class SizeofTest:
796
797 @support.cpython_only
798 def test_sizeof(self):
799 bufsize1 = 4096
800 bufsize2 = 8192
801 rawio = self.MockRawIO()
802 bufio = self.tp(rawio, buffer_size=bufsize1)
803 size = sys.getsizeof(bufio) - bufsize1
804 rawio = self.MockRawIO()
805 bufio = self.tp(rawio, buffer_size=bufsize2)
806 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
807
808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000809class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
810 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 def test_constructor(self):
813 rawio = self.MockRawIO([b"abc"])
814 bufio = self.tp(rawio)
815 bufio.__init__(rawio)
816 bufio.__init__(rawio, buffer_size=1024)
817 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000818 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
822 rawio = self.MockRawIO([b"abc"])
823 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000827 for arg in (None, 7):
828 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
829 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000831 # Invalid args
832 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 def test_read1(self):
835 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
836 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000837 self.assertEqual(b"a", bufio.read(1))
838 self.assertEqual(b"b", bufio.read1(1))
839 self.assertEqual(rawio._reads, 1)
840 self.assertEqual(b"c", bufio.read1(100))
841 self.assertEqual(rawio._reads, 1)
842 self.assertEqual(b"d", bufio.read1(100))
843 self.assertEqual(rawio._reads, 2)
844 self.assertEqual(b"efg", bufio.read1(100))
845 self.assertEqual(rawio._reads, 3)
846 self.assertEqual(b"", bufio.read1(100))
847 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 # Invalid args
849 self.assertRaises(ValueError, bufio.read1, -1)
850
851 def test_readinto(self):
852 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
853 bufio = self.tp(rawio)
854 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000855 self.assertEqual(bufio.readinto(b), 2)
856 self.assertEqual(b, b"ab")
857 self.assertEqual(bufio.readinto(b), 2)
858 self.assertEqual(b, b"cd")
859 self.assertEqual(bufio.readinto(b), 2)
860 self.assertEqual(b, b"ef")
861 self.assertEqual(bufio.readinto(b), 1)
862 self.assertEqual(b, b"gf")
863 self.assertEqual(bufio.readinto(b), 0)
864 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000866 def test_readlines(self):
867 def bufio():
868 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
869 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
871 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
872 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000874 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000875 data = b"abcdefghi"
876 dlen = len(data)
877
878 tests = [
879 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
880 [ 100, [ 3, 3, 3], [ dlen ] ],
881 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
882 ]
883
884 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 rawio = self.MockFileIO(data)
886 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000887 pos = 0
888 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000889 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000890 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000891 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000892 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000895 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
897 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000898 self.assertEqual(b"abcd", bufio.read(6))
899 self.assertEqual(b"e", bufio.read(1))
900 self.assertEqual(b"fg", bufio.read())
901 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200902 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000903 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000904
Victor Stinnera80987f2011-05-25 22:47:16 +0200905 rawio = self.MockRawIO((b"a", None, None))
906 self.assertEqual(b"a", rawio.readall())
907 self.assertIsNone(rawio.readall())
908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_read_past_eof(self):
910 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
911 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000912
Ezio Melottib3aedd42010-11-20 19:04:17 +0000913 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 def test_read_all(self):
916 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
917 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000918
Ezio Melottib3aedd42010-11-20 19:04:17 +0000919 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000920
Victor Stinner45df8202010-04-28 22:31:17 +0000921 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000922 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000924 try:
925 # Write out many bytes with exactly the same number of 0's,
926 # 1's... 255's. This will help us check that concurrent reading
927 # doesn't duplicate or forget contents.
928 N = 1000
929 l = list(range(256)) * N
930 random.shuffle(l)
931 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000932 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000933 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000934 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000936 errors = []
937 results = []
938 def f():
939 try:
940 # Intra-buffer read then buffer-flushing read
941 for n in cycle([1, 19]):
942 s = bufio.read(n)
943 if not s:
944 break
945 # list.append() is atomic
946 results.append(s)
947 except Exception as e:
948 errors.append(e)
949 raise
950 threads = [threading.Thread(target=f) for x in range(20)]
951 for t in threads:
952 t.start()
953 time.sleep(0.02) # yield
954 for t in threads:
955 t.join()
956 self.assertFalse(errors,
957 "the following exceptions were caught: %r" % errors)
958 s = b''.join(results)
959 for i in range(256):
960 c = bytes(bytearray([i]))
961 self.assertEqual(s.count(c), N)
962 finally:
963 support.unlink(support.TESTFN)
964
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200965 def test_unseekable(self):
966 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
967 self.assertRaises(self.UnsupportedOperation, bufio.tell)
968 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
969 bufio.read(1)
970 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
971 self.assertRaises(self.UnsupportedOperation, bufio.tell)
972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 def test_misbehaved_io(self):
974 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
975 bufio = self.tp(rawio)
976 self.assertRaises(IOError, bufio.seek, 0)
977 self.assertRaises(IOError, bufio.tell)
978
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000979 def test_no_extraneous_read(self):
980 # Issue #9550; when the raw IO object has satisfied the read request,
981 # we should not issue any additional reads, otherwise it may block
982 # (e.g. socket).
983 bufsize = 16
984 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
985 rawio = self.MockRawIO([b"x" * n])
986 bufio = self.tp(rawio, bufsize)
987 self.assertEqual(bufio.read(n), b"x" * n)
988 # Simple case: one raw read is enough to satisfy the request.
989 self.assertEqual(rawio._extraneous_reads, 0,
990 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
991 # A more complex case where two raw reads are needed to satisfy
992 # the request.
993 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
994 bufio = self.tp(rawio, bufsize)
995 self.assertEqual(bufio.read(n), b"x" * n)
996 self.assertEqual(rawio._extraneous_reads, 0,
997 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
998
999
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001000class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 tp = io.BufferedReader
1002
1003 def test_constructor(self):
1004 BufferedReaderTest.test_constructor(self)
1005 # The allocation can succeed on 32-bit builds, e.g. with more
1006 # than 2GB RAM and a 64-bit kernel.
1007 if sys.maxsize > 0x7FFFFFFF:
1008 rawio = self.MockRawIO()
1009 bufio = self.tp(rawio)
1010 self.assertRaises((OverflowError, MemoryError, ValueError),
1011 bufio.__init__, rawio, sys.maxsize)
1012
1013 def test_initialization(self):
1014 rawio = self.MockRawIO([b"abc"])
1015 bufio = self.tp(rawio)
1016 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1017 self.assertRaises(ValueError, bufio.read)
1018 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1019 self.assertRaises(ValueError, bufio.read)
1020 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1021 self.assertRaises(ValueError, bufio.read)
1022
1023 def test_misbehaved_io_read(self):
1024 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1025 bufio = self.tp(rawio)
1026 # _pyio.BufferedReader seems to implement reading different, so that
1027 # checking this is not so easy.
1028 self.assertRaises(IOError, bufio.read, 10)
1029
1030 def test_garbage_collection(self):
1031 # C BufferedReader objects are collected.
1032 # The Python version has __del__, so it ends into gc.garbage instead
1033 rawio = self.FileIO(support.TESTFN, "w+b")
1034 f = self.tp(rawio)
1035 f.f = f
1036 wr = weakref.ref(f)
1037 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001038 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001039 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040
1041class PyBufferedReaderTest(BufferedReaderTest):
1042 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001043
Guido van Rossuma9e20242007-03-08 00:43:48 +00001044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1046 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_constructor(self):
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio)
1051 bufio.__init__(rawio)
1052 bufio.__init__(rawio, buffer_size=1024)
1053 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001054 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 bufio.flush()
1056 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1059 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001060 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001061 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001064 def test_detach_flush(self):
1065 raw = self.MockRawIO()
1066 buf = self.tp(raw)
1067 buf.write(b"howdy!")
1068 self.assertFalse(raw._write_stack)
1069 buf.detach()
1070 self.assertEqual(raw._write_stack, [b"howdy!"])
1071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001073 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 writer = self.MockRawIO()
1075 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001076 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001077 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 def test_write_overflow(self):
1080 writer = self.MockRawIO()
1081 bufio = self.tp(writer, 8)
1082 contents = b"abcdefghijklmnop"
1083 for n in range(0, len(contents), 3):
1084 bufio.write(contents[n:n+3])
1085 flushed = b"".join(writer._write_stack)
1086 # At least (total - 8) bytes were implicitly flushed, perhaps more
1087 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def check_writes(self, intermediate_func):
1091 # Lots of writes, test the flushed output is as expected.
1092 contents = bytes(range(256)) * 1000
1093 n = 0
1094 writer = self.MockRawIO()
1095 bufio = self.tp(writer, 13)
1096 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1097 def gen_sizes():
1098 for size in count(1):
1099 for i in range(15):
1100 yield size
1101 sizes = gen_sizes()
1102 while n < len(contents):
1103 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001104 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 intermediate_func(bufio)
1106 n += size
1107 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 def test_writes(self):
1111 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_writes_and_flushes(self):
1114 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 def test_writes_and_seeks(self):
1117 def _seekabs(bufio):
1118 pos = bufio.tell()
1119 bufio.seek(pos + 1, 0)
1120 bufio.seek(pos - 1, 0)
1121 bufio.seek(pos, 0)
1122 self.check_writes(_seekabs)
1123 def _seekrel(bufio):
1124 pos = bufio.seek(0, 1)
1125 bufio.seek(+1, 1)
1126 bufio.seek(-1, 1)
1127 bufio.seek(pos, 0)
1128 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130 def test_writes_and_truncates(self):
1131 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 def test_write_non_blocking(self):
1134 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001135 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001136
Ezio Melottib3aedd42010-11-20 19:04:17 +00001137 self.assertEqual(bufio.write(b"abcd"), 4)
1138 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 # 1 byte will be written, the rest will be buffered
1140 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1144 raw.block_on(b"0")
1145 try:
1146 bufio.write(b"opqrwxyz0123456789")
1147 except self.BlockingIOError as e:
1148 written = e.characters_written
1149 else:
1150 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001151 self.assertEqual(written, 16)
1152 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001154
Ezio Melottib3aedd42010-11-20 19:04:17 +00001155 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 s = raw.pop_written()
1157 # Previously buffered bytes were flushed
1158 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 def test_write_and_rewind(self):
1161 raw = io.BytesIO()
1162 bufio = self.tp(raw, 4)
1163 self.assertEqual(bufio.write(b"abcdef"), 6)
1164 self.assertEqual(bufio.tell(), 6)
1165 bufio.seek(0, 0)
1166 self.assertEqual(bufio.write(b"XY"), 2)
1167 bufio.seek(6, 0)
1168 self.assertEqual(raw.getvalue(), b"XYcdef")
1169 self.assertEqual(bufio.write(b"123456"), 6)
1170 bufio.flush()
1171 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 def test_flush(self):
1174 writer = self.MockRawIO()
1175 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001176 bufio.write(b"abc")
1177 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001178 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001179
Antoine Pitrou131a4892012-10-16 22:57:11 +02001180 def test_writelines(self):
1181 l = [b'ab', b'cd', b'ef']
1182 writer = self.MockRawIO()
1183 bufio = self.tp(writer, 8)
1184 bufio.writelines(l)
1185 bufio.flush()
1186 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1187
1188 def test_writelines_userlist(self):
1189 l = UserList([b'ab', b'cd', b'ef'])
1190 writer = self.MockRawIO()
1191 bufio = self.tp(writer, 8)
1192 bufio.writelines(l)
1193 bufio.flush()
1194 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1195
1196 def test_writelines_error(self):
1197 writer = self.MockRawIO()
1198 bufio = self.tp(writer, 8)
1199 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1200 self.assertRaises(TypeError, bufio.writelines, None)
1201 self.assertRaises(TypeError, bufio.writelines, 'abc')
1202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 def test_destructor(self):
1204 writer = self.MockRawIO()
1205 bufio = self.tp(writer, 8)
1206 bufio.write(b"abc")
1207 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001208 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001209 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210
1211 def test_truncate(self):
1212 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001213 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214 bufio = self.tp(raw, 8)
1215 bufio.write(b"abcdef")
1216 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001217 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001218 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 self.assertEqual(f.read(), b"abc")
1220
Victor Stinner45df8202010-04-28 22:31:17 +00001221 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001222 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001223 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001224 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 # Write out many bytes from many threads and test they were
1226 # all flushed.
1227 N = 1000
1228 contents = bytes(range(256)) * N
1229 sizes = cycle([1, 19])
1230 n = 0
1231 queue = deque()
1232 while n < len(contents):
1233 size = next(sizes)
1234 queue.append(contents[n:n+size])
1235 n += size
1236 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001237 # We use a real file object because it allows us to
1238 # exercise situations where the GIL is released before
1239 # writing the buffer to the raw streams. This is in addition
1240 # to concurrency issues due to switching threads in the middle
1241 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001242 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001244 errors = []
1245 def f():
1246 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 while True:
1248 try:
1249 s = queue.popleft()
1250 except IndexError:
1251 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001252 bufio.write(s)
1253 except Exception as e:
1254 errors.append(e)
1255 raise
1256 threads = [threading.Thread(target=f) for x in range(20)]
1257 for t in threads:
1258 t.start()
1259 time.sleep(0.02) # yield
1260 for t in threads:
1261 t.join()
1262 self.assertFalse(errors,
1263 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001265 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001266 s = f.read()
1267 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001268 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001269 finally:
1270 support.unlink(support.TESTFN)
1271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272 def test_misbehaved_io(self):
1273 rawio = self.MisbehavedRawIO()
1274 bufio = self.tp(rawio, 5)
1275 self.assertRaises(IOError, bufio.seek, 0)
1276 self.assertRaises(IOError, bufio.tell)
1277 self.assertRaises(IOError, bufio.write, b"abcdef")
1278
Benjamin Peterson59406a92009-03-26 17:10:29 +00001279 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001280 with support.check_warnings(("max_buffer_size is deprecated",
1281 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001282 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001283
1284
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001285class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001286 tp = io.BufferedWriter
1287
1288 def test_constructor(self):
1289 BufferedWriterTest.test_constructor(self)
1290 # The allocation can succeed on 32-bit builds, e.g. with more
1291 # than 2GB RAM and a 64-bit kernel.
1292 if sys.maxsize > 0x7FFFFFFF:
1293 rawio = self.MockRawIO()
1294 bufio = self.tp(rawio)
1295 self.assertRaises((OverflowError, MemoryError, ValueError),
1296 bufio.__init__, rawio, sys.maxsize)
1297
1298 def test_initialization(self):
1299 rawio = self.MockRawIO()
1300 bufio = self.tp(rawio)
1301 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1302 self.assertRaises(ValueError, bufio.write, b"def")
1303 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1304 self.assertRaises(ValueError, bufio.write, b"def")
1305 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1306 self.assertRaises(ValueError, bufio.write, b"def")
1307
1308 def test_garbage_collection(self):
1309 # C BufferedWriter objects are collected, and collecting them flushes
1310 # all data to disk.
1311 # The Python version has __del__, so it ends into gc.garbage instead
1312 rawio = self.FileIO(support.TESTFN, "w+b")
1313 f = self.tp(rawio)
1314 f.write(b"123xxx")
1315 f.x = f
1316 wr = weakref.ref(f)
1317 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001318 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001319 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001320 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001321 self.assertEqual(f.read(), b"123xxx")
1322
1323
1324class PyBufferedWriterTest(BufferedWriterTest):
1325 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001326
Guido van Rossum01a27522007-03-07 01:00:12 +00001327class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001328
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001329 def test_constructor(self):
1330 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001331 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001333 def test_detach(self):
1334 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1335 self.assertRaises(self.UnsupportedOperation, pair.detach)
1336
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001337 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001338 with support.check_warnings(("max_buffer_size is deprecated",
1339 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001340 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001341
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001342 def test_constructor_with_not_readable(self):
1343 class NotReadable(MockRawIO):
1344 def readable(self):
1345 return False
1346
1347 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1348
1349 def test_constructor_with_not_writeable(self):
1350 class NotWriteable(MockRawIO):
1351 def writable(self):
1352 return False
1353
1354 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1355
1356 def test_read(self):
1357 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1358
1359 self.assertEqual(pair.read(3), b"abc")
1360 self.assertEqual(pair.read(1), b"d")
1361 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001362 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1363 self.assertEqual(pair.read(None), b"abc")
1364
1365 def test_readlines(self):
1366 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1367 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1368 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1369 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001370
1371 def test_read1(self):
1372 # .read1() is delegated to the underlying reader object, so this test
1373 # can be shallow.
1374 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1375
1376 self.assertEqual(pair.read1(3), b"abc")
1377
1378 def test_readinto(self):
1379 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1380
1381 data = bytearray(5)
1382 self.assertEqual(pair.readinto(data), 5)
1383 self.assertEqual(data, b"abcde")
1384
1385 def test_write(self):
1386 w = self.MockRawIO()
1387 pair = self.tp(self.MockRawIO(), w)
1388
1389 pair.write(b"abc")
1390 pair.flush()
1391 pair.write(b"def")
1392 pair.flush()
1393 self.assertEqual(w._write_stack, [b"abc", b"def"])
1394
1395 def test_peek(self):
1396 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1397
1398 self.assertTrue(pair.peek(3).startswith(b"abc"))
1399 self.assertEqual(pair.read(3), b"abc")
1400
1401 def test_readable(self):
1402 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1403 self.assertTrue(pair.readable())
1404
1405 def test_writeable(self):
1406 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1407 self.assertTrue(pair.writable())
1408
1409 def test_seekable(self):
1410 # BufferedRWPairs are never seekable, even if their readers and writers
1411 # are.
1412 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1413 self.assertFalse(pair.seekable())
1414
1415 # .flush() is delegated to the underlying writer object and has been
1416 # tested in the test_write method.
1417
1418 def test_close_and_closed(self):
1419 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1420 self.assertFalse(pair.closed)
1421 pair.close()
1422 self.assertTrue(pair.closed)
1423
1424 def test_isatty(self):
1425 class SelectableIsAtty(MockRawIO):
1426 def __init__(self, isatty):
1427 MockRawIO.__init__(self)
1428 self._isatty = isatty
1429
1430 def isatty(self):
1431 return self._isatty
1432
1433 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1434 self.assertFalse(pair.isatty())
1435
1436 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1437 self.assertTrue(pair.isatty())
1438
1439 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1440 self.assertTrue(pair.isatty())
1441
1442 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1443 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445class CBufferedRWPairTest(BufferedRWPairTest):
1446 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448class PyBufferedRWPairTest(BufferedRWPairTest):
1449 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451
1452class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1453 read_mode = "rb+"
1454 write_mode = "wb+"
1455
1456 def test_constructor(self):
1457 BufferedReaderTest.test_constructor(self)
1458 BufferedWriterTest.test_constructor(self)
1459
1460 def test_read_and_write(self):
1461 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001462 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001463
1464 self.assertEqual(b"as", rw.read(2))
1465 rw.write(b"ddd")
1466 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001467 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001469 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 def test_seek_and_tell(self):
1472 raw = self.BytesIO(b"asdfghjkl")
1473 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001474
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(b"as", rw.read(2))
1476 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001477 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001478 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001479
Antoine Pitroue05565e2011-08-20 14:39:23 +02001480 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001481 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001482 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001483 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001484 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001486 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(7, rw.tell())
1488 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001489 rw.flush()
1490 self.assertEqual(b"asdf123fl", raw.getvalue())
1491
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001492 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494 def check_flush_and_read(self, read_func):
1495 raw = self.BytesIO(b"abcdefghi")
1496 bufio = self.tp(raw)
1497
Ezio Melottib3aedd42010-11-20 19:04:17 +00001498 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001500 self.assertEqual(b"ef", read_func(bufio, 2))
1501 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001502 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001503 self.assertEqual(6, bufio.tell())
1504 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 raw.seek(0, 0)
1506 raw.write(b"XYZ")
1507 # flush() resets the read buffer
1508 bufio.flush()
1509 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001510 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511
1512 def test_flush_and_read(self):
1513 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1514
1515 def test_flush_and_readinto(self):
1516 def _readinto(bufio, n=-1):
1517 b = bytearray(n if n >= 0 else 9999)
1518 n = bufio.readinto(b)
1519 return bytes(b[:n])
1520 self.check_flush_and_read(_readinto)
1521
1522 def test_flush_and_peek(self):
1523 def _peek(bufio, n=-1):
1524 # This relies on the fact that the buffer can contain the whole
1525 # raw stream, otherwise peek() can return less.
1526 b = bufio.peek(n)
1527 if n != -1:
1528 b = b[:n]
1529 bufio.seek(len(b), 1)
1530 return b
1531 self.check_flush_and_read(_peek)
1532
1533 def test_flush_and_write(self):
1534 raw = self.BytesIO(b"abcdefghi")
1535 bufio = self.tp(raw)
1536
1537 bufio.write(b"123")
1538 bufio.flush()
1539 bufio.write(b"45")
1540 bufio.flush()
1541 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001542 self.assertEqual(b"12345fghi", raw.getvalue())
1543 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544
1545 def test_threads(self):
1546 BufferedReaderTest.test_threads(self)
1547 BufferedWriterTest.test_threads(self)
1548
1549 def test_writes_and_peek(self):
1550 def _peek(bufio):
1551 bufio.peek(1)
1552 self.check_writes(_peek)
1553 def _peek(bufio):
1554 pos = bufio.tell()
1555 bufio.seek(-1, 1)
1556 bufio.peek(1)
1557 bufio.seek(pos, 0)
1558 self.check_writes(_peek)
1559
1560 def test_writes_and_reads(self):
1561 def _read(bufio):
1562 bufio.seek(-1, 1)
1563 bufio.read(1)
1564 self.check_writes(_read)
1565
1566 def test_writes_and_read1s(self):
1567 def _read1(bufio):
1568 bufio.seek(-1, 1)
1569 bufio.read1(1)
1570 self.check_writes(_read1)
1571
1572 def test_writes_and_readintos(self):
1573 def _read(bufio):
1574 bufio.seek(-1, 1)
1575 bufio.readinto(bytearray(1))
1576 self.check_writes(_read)
1577
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001578 def test_write_after_readahead(self):
1579 # Issue #6629: writing after the buffer was filled by readahead should
1580 # first rewind the raw stream.
1581 for overwrite_size in [1, 5]:
1582 raw = self.BytesIO(b"A" * 10)
1583 bufio = self.tp(raw, 4)
1584 # Trigger readahead
1585 self.assertEqual(bufio.read(1), b"A")
1586 self.assertEqual(bufio.tell(), 1)
1587 # Overwriting should rewind the raw stream if it needs so
1588 bufio.write(b"B" * overwrite_size)
1589 self.assertEqual(bufio.tell(), overwrite_size + 1)
1590 # If the write size was smaller than the buffer size, flush() and
1591 # check that rewind happens.
1592 bufio.flush()
1593 self.assertEqual(bufio.tell(), overwrite_size + 1)
1594 s = raw.getvalue()
1595 self.assertEqual(s,
1596 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1597
Antoine Pitrou7c404892011-05-13 00:13:33 +02001598 def test_write_rewind_write(self):
1599 # Various combinations of reading / writing / seeking backwards / writing again
1600 def mutate(bufio, pos1, pos2):
1601 assert pos2 >= pos1
1602 # Fill the buffer
1603 bufio.seek(pos1)
1604 bufio.read(pos2 - pos1)
1605 bufio.write(b'\x02')
1606 # This writes earlier than the previous write, but still inside
1607 # the buffer.
1608 bufio.seek(pos1)
1609 bufio.write(b'\x01')
1610
1611 b = b"\x80\x81\x82\x83\x84"
1612 for i in range(0, len(b)):
1613 for j in range(i, len(b)):
1614 raw = self.BytesIO(b)
1615 bufio = self.tp(raw, 100)
1616 mutate(bufio, i, j)
1617 bufio.flush()
1618 expected = bytearray(b)
1619 expected[j] = 2
1620 expected[i] = 1
1621 self.assertEqual(raw.getvalue(), expected,
1622 "failed result for i=%d, j=%d" % (i, j))
1623
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001624 def test_truncate_after_read_or_write(self):
1625 raw = self.BytesIO(b"A" * 10)
1626 bufio = self.tp(raw, 100)
1627 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1628 self.assertEqual(bufio.truncate(), 2)
1629 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1630 self.assertEqual(bufio.truncate(), 4)
1631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 def test_misbehaved_io(self):
1633 BufferedReaderTest.test_misbehaved_io(self)
1634 BufferedWriterTest.test_misbehaved_io(self)
1635
Antoine Pitroue05565e2011-08-20 14:39:23 +02001636 def test_interleaved_read_write(self):
1637 # Test for issue #12213
1638 with self.BytesIO(b'abcdefgh') as raw:
1639 with self.tp(raw, 100) as f:
1640 f.write(b"1")
1641 self.assertEqual(f.read(1), b'b')
1642 f.write(b'2')
1643 self.assertEqual(f.read1(1), b'd')
1644 f.write(b'3')
1645 buf = bytearray(1)
1646 f.readinto(buf)
1647 self.assertEqual(buf, b'f')
1648 f.write(b'4')
1649 self.assertEqual(f.peek(1), b'h')
1650 f.flush()
1651 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1652
1653 with self.BytesIO(b'abc') as raw:
1654 with self.tp(raw, 100) as f:
1655 self.assertEqual(f.read(1), b'a')
1656 f.write(b"2")
1657 self.assertEqual(f.read(1), b'c')
1658 f.flush()
1659 self.assertEqual(raw.getvalue(), b'a2c')
1660
1661 def test_interleaved_readline_write(self):
1662 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1663 with self.tp(raw) as f:
1664 f.write(b'1')
1665 self.assertEqual(f.readline(), b'b\n')
1666 f.write(b'2')
1667 self.assertEqual(f.readline(), b'def\n')
1668 f.write(b'3')
1669 self.assertEqual(f.readline(), b'\n')
1670 f.flush()
1671 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1672
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001673 # You can't construct a BufferedRandom over a non-seekable stream.
1674 test_unseekable = None
1675
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001676class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 tp = io.BufferedRandom
1678
1679 def test_constructor(self):
1680 BufferedRandomTest.test_constructor(self)
1681 # The allocation can succeed on 32-bit builds, e.g. with more
1682 # than 2GB RAM and a 64-bit kernel.
1683 if sys.maxsize > 0x7FFFFFFF:
1684 rawio = self.MockRawIO()
1685 bufio = self.tp(rawio)
1686 self.assertRaises((OverflowError, MemoryError, ValueError),
1687 bufio.__init__, rawio, sys.maxsize)
1688
1689 def test_garbage_collection(self):
1690 CBufferedReaderTest.test_garbage_collection(self)
1691 CBufferedWriterTest.test_garbage_collection(self)
1692
1693class PyBufferedRandomTest(BufferedRandomTest):
1694 tp = pyio.BufferedRandom
1695
1696
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001697# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1698# properties:
1699# - A single output character can correspond to many bytes of input.
1700# - The number of input bytes to complete the character can be
1701# undetermined until the last input byte is received.
1702# - The number of input bytes can vary depending on previous input.
1703# - A single input byte can correspond to many characters of output.
1704# - The number of output characters can be undetermined until the
1705# last input byte is received.
1706# - The number of output characters can vary depending on previous input.
1707
1708class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1709 """
1710 For testing seek/tell behavior with a stateful, buffering decoder.
1711
1712 Input is a sequence of words. Words may be fixed-length (length set
1713 by input) or variable-length (period-terminated). In variable-length
1714 mode, extra periods are ignored. Possible words are:
1715 - 'i' followed by a number sets the input length, I (maximum 99).
1716 When I is set to 0, words are space-terminated.
1717 - 'o' followed by a number sets the output length, O (maximum 99).
1718 - Any other word is converted into a word followed by a period on
1719 the output. The output word consists of the input word truncated
1720 or padded out with hyphens to make its length equal to O. If O
1721 is 0, the word is output verbatim without truncating or padding.
1722 I and O are initially set to 1. When I changes, any buffered input is
1723 re-scanned according to the new I. EOF also terminates the last word.
1724 """
1725
1726 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001727 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001728 self.reset()
1729
1730 def __repr__(self):
1731 return '<SID %x>' % id(self)
1732
1733 def reset(self):
1734 self.i = 1
1735 self.o = 1
1736 self.buffer = bytearray()
1737
1738 def getstate(self):
1739 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1740 return bytes(self.buffer), i*100 + o
1741
1742 def setstate(self, state):
1743 buffer, io = state
1744 self.buffer = bytearray(buffer)
1745 i, o = divmod(io, 100)
1746 self.i, self.o = i ^ 1, o ^ 1
1747
1748 def decode(self, input, final=False):
1749 output = ''
1750 for b in input:
1751 if self.i == 0: # variable-length, terminated with period
1752 if b == ord('.'):
1753 if self.buffer:
1754 output += self.process_word()
1755 else:
1756 self.buffer.append(b)
1757 else: # fixed-length, terminate after self.i bytes
1758 self.buffer.append(b)
1759 if len(self.buffer) == self.i:
1760 output += self.process_word()
1761 if final and self.buffer: # EOF terminates the last word
1762 output += self.process_word()
1763 return output
1764
1765 def process_word(self):
1766 output = ''
1767 if self.buffer[0] == ord('i'):
1768 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1769 elif self.buffer[0] == ord('o'):
1770 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1771 else:
1772 output = self.buffer.decode('ascii')
1773 if len(output) < self.o:
1774 output += '-'*self.o # pad out with hyphens
1775 if self.o:
1776 output = output[:self.o] # truncate to output length
1777 output += '.'
1778 self.buffer = bytearray()
1779 return output
1780
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001781 codecEnabled = False
1782
1783 @classmethod
1784 def lookupTestDecoder(cls, name):
1785 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001786 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001787 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001788 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001789 incrementalencoder=None,
1790 streamreader=None, streamwriter=None,
1791 incrementaldecoder=cls)
1792
1793# Register the previous decoder for testing.
1794# Disabled by default, tests will enable it.
1795codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1796
1797
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001798class StatefulIncrementalDecoderTest(unittest.TestCase):
1799 """
1800 Make sure the StatefulIncrementalDecoder actually works.
1801 """
1802
1803 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001804 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001805 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001806 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001807 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001808 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001809 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001810 # I=0, O=6 (variable-length input, fixed-length output)
1811 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1812 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001813 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001814 # I=6, O=3 (fixed-length input > fixed-length output)
1815 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1816 # I=0, then 3; O=29, then 15 (with longer output)
1817 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1818 'a----------------------------.' +
1819 'b----------------------------.' +
1820 'cde--------------------------.' +
1821 'abcdefghijabcde.' +
1822 'a.b------------.' +
1823 '.c.------------.' +
1824 'd.e------------.' +
1825 'k--------------.' +
1826 'l--------------.' +
1827 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001828 ]
1829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001831 # Try a few one-shot test cases.
1832 for input, eof, output in self.test_cases:
1833 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001834 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001835
1836 # Also test an unfinished decode, followed by forcing EOF.
1837 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001838 self.assertEqual(d.decode(b'oiabcd'), '')
1839 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001840
1841class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001842
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001843 def setUp(self):
1844 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1845 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001846 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001847
Guido van Rossumd0712812007-04-11 16:32:43 +00001848 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001849 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 def test_constructor(self):
1852 r = self.BytesIO(b"\xc3\xa9\n\n")
1853 b = self.BufferedReader(r, 1000)
1854 t = self.TextIOWrapper(b)
1855 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001856 self.assertEqual(t.encoding, "latin1")
1857 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001859 self.assertEqual(t.encoding, "utf8")
1860 self.assertEqual(t.line_buffering, True)
1861 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 self.assertRaises(TypeError, t.__init__, b, newline=42)
1863 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1864
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001865 def test_detach(self):
1866 r = self.BytesIO()
1867 b = self.BufferedWriter(r)
1868 t = self.TextIOWrapper(b)
1869 self.assertIs(t.detach(), b)
1870
1871 t = self.TextIOWrapper(b, encoding="ascii")
1872 t.write("howdy")
1873 self.assertFalse(r.getvalue())
1874 t.detach()
1875 self.assertEqual(r.getvalue(), b"howdy")
1876 self.assertRaises(ValueError, t.detach)
1877
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001878 def test_repr(self):
1879 raw = self.BytesIO("hello".encode("utf-8"))
1880 b = self.BufferedReader(raw)
1881 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001882 modname = self.TextIOWrapper.__module__
1883 self.assertEqual(repr(t),
1884 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1885 raw.name = "dummy"
1886 self.assertEqual(repr(t),
1887 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001888 t.mode = "r"
1889 self.assertEqual(repr(t),
1890 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001891 raw.name = b"dummy"
1892 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001893 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 def test_line_buffering(self):
1896 r = self.BytesIO()
1897 b = self.BufferedWriter(r, 1000)
1898 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001899 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001900 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001901 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001902 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001903 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001904 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 def test_encoding(self):
1907 # Check the encoding attribute is always set, and valid
1908 b = self.BytesIO()
1909 t = self.TextIOWrapper(b, encoding="utf8")
1910 self.assertEqual(t.encoding, "utf8")
1911 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001912 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 codecs.lookup(t.encoding)
1914
1915 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001916 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 b = self.BytesIO(b"abc\n\xff\n")
1918 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001919 self.assertRaises(UnicodeError, t.read)
1920 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 b = self.BytesIO(b"abc\n\xff\n")
1922 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001923 self.assertRaises(UnicodeError, t.read)
1924 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001925 b = self.BytesIO(b"abc\n\xff\n")
1926 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001928 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 b = self.BytesIO(b"abc\n\xff\n")
1930 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001931 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001934 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935 b = self.BytesIO()
1936 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001937 self.assertRaises(UnicodeError, t.write, "\xff")
1938 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939 b = self.BytesIO()
1940 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001941 self.assertRaises(UnicodeError, t.write, "\xff")
1942 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 b = self.BytesIO()
1944 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001945 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001946 t.write("abc\xffdef\n")
1947 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001949 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950 b = self.BytesIO()
1951 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001952 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001953 t.write("abc\xffdef\n")
1954 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001955 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001958 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1959
1960 tests = [
1961 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001962 [ '', input_lines ],
1963 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1964 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1965 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001966 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001967 encodings = (
1968 'utf-8', 'latin-1',
1969 'utf-16', 'utf-16-le', 'utf-16-be',
1970 'utf-32', 'utf-32-le', 'utf-32-be',
1971 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001972
Guido van Rossum8358db22007-08-18 21:39:55 +00001973 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001974 # character in TextIOWrapper._pending_line.
1975 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001976 # XXX: str.encode() should return bytes
1977 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001978 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001979 for bufsize in range(1, 10):
1980 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1982 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001983 encoding=encoding)
1984 if do_reads:
1985 got_lines = []
1986 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001987 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001988 if c2 == '':
1989 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001991 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001992 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001993 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001994
1995 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001996 self.assertEqual(got_line, exp_line)
1997 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 def test_newlines_input(self):
2000 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002001 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2002 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002003 (None, normalized.decode("ascii").splitlines(True)),
2004 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2006 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2007 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002008 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 buf = self.BytesIO(testdata)
2010 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002012 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 def test_newlines_output(self):
2016 testdict = {
2017 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2018 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2019 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2020 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2021 }
2022 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2023 for newline, expected in tests:
2024 buf = self.BytesIO()
2025 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2026 txt.write("AAA\nB")
2027 txt.write("BB\nCCC\n")
2028 txt.write("X\rY\r\nZ")
2029 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002030 self.assertEqual(buf.closed, False)
2031 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032
2033 def test_destructor(self):
2034 l = []
2035 base = self.BytesIO
2036 class MyBytesIO(base):
2037 def close(self):
2038 l.append(self.getvalue())
2039 base.close(self)
2040 b = MyBytesIO()
2041 t = self.TextIOWrapper(b, encoding="ascii")
2042 t.write("abc")
2043 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002044 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002045 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046
2047 def test_override_destructor(self):
2048 record = []
2049 class MyTextIO(self.TextIOWrapper):
2050 def __del__(self):
2051 record.append(1)
2052 try:
2053 f = super().__del__
2054 except AttributeError:
2055 pass
2056 else:
2057 f()
2058 def close(self):
2059 record.append(2)
2060 super().close()
2061 def flush(self):
2062 record.append(3)
2063 super().flush()
2064 b = self.BytesIO()
2065 t = MyTextIO(b, encoding="ascii")
2066 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002067 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002068 self.assertEqual(record, [1, 2, 3])
2069
2070 def test_error_through_destructor(self):
2071 # Test that the exception state is not modified by a destructor,
2072 # even if close() fails.
2073 rawio = self.CloseFailureIO()
2074 def f():
2075 self.TextIOWrapper(rawio).xyzzy
2076 with support.captured_output("stderr") as s:
2077 self.assertRaises(AttributeError, f)
2078 s = s.getvalue().strip()
2079 if s:
2080 # The destructor *may* have printed an unraisable error, check it
2081 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002082 self.assertTrue(s.startswith("Exception IOError: "), s)
2083 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002084
Guido van Rossum9b76da62007-04-11 01:09:03 +00002085 # Systematic tests of the text I/O API
2086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002088 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2089 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002091 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002093 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002094 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002095 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(f.tell(), 0)
2097 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002098 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002099 self.assertEqual(f.seek(0), 0)
2100 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002101 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002102 self.assertEqual(f.read(2), "ab")
2103 self.assertEqual(f.read(1), "c")
2104 self.assertEqual(f.read(1), "")
2105 self.assertEqual(f.read(), "")
2106 self.assertEqual(f.tell(), cookie)
2107 self.assertEqual(f.seek(0), 0)
2108 self.assertEqual(f.seek(0, 2), cookie)
2109 self.assertEqual(f.write("def"), 3)
2110 self.assertEqual(f.seek(cookie), cookie)
2111 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002112 if enc.startswith("utf"):
2113 self.multi_line_test(f, enc)
2114 f.close()
2115
2116 def multi_line_test(self, f, enc):
2117 f.seek(0)
2118 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002119 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002120 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002121 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002122 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002123 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002124 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002125 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002126 wlines.append((f.tell(), line))
2127 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002128 f.seek(0)
2129 rlines = []
2130 while True:
2131 pos = f.tell()
2132 line = f.readline()
2133 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002134 break
2135 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002136 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 def test_telling(self):
2139 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002140 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002141 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002142 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002143 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002144 p2 = f.tell()
2145 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002146 self.assertEqual(f.tell(), p0)
2147 self.assertEqual(f.readline(), "\xff\n")
2148 self.assertEqual(f.tell(), p1)
2149 self.assertEqual(f.readline(), "\xff\n")
2150 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002151 f.seek(0)
2152 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002153 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002154 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002155 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002156 f.close()
2157
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002158 def test_seeking(self):
2159 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002160 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002161 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002162 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002164 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002165 suffix = bytes(u_suffix.encode("utf-8"))
2166 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002167 with self.open(support.TESTFN, "wb") as f:
2168 f.write(line*2)
2169 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2170 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(s, str(prefix, "ascii"))
2172 self.assertEqual(f.tell(), prefix_size)
2173 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002176 # Regression test for a specific bug
2177 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002178 with self.open(support.TESTFN, "wb") as f:
2179 f.write(data)
2180 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2181 f._CHUNK_SIZE # Just test that it exists
2182 f._CHUNK_SIZE = 2
2183 f.readline()
2184 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 def test_seek_and_tell(self):
2187 #Test seek/tell using the StatefulIncrementalDecoder.
2188 # Make test faster by doing smaller seeks
2189 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002190
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002191 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002192 """Tell/seek to various points within a data stream and ensure
2193 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002195 f.write(data)
2196 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 f = self.open(support.TESTFN, encoding='test_decoder')
2198 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002199 decoded = f.read()
2200 f.close()
2201
Neal Norwitze2b07052008-03-18 19:52:05 +00002202 for i in range(min_pos, len(decoded) + 1): # seek positions
2203 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002206 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002207 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002208 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002210 f.close()
2211
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002212 # Enable the test decoder.
2213 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002214
2215 # Run the tests.
2216 try:
2217 # Try each test case.
2218 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002219 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002220
2221 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002222 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2223 offset = CHUNK_SIZE - len(input)//2
2224 prefix = b'.'*offset
2225 # Don't bother seeking into the prefix (takes too long).
2226 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002227 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002228
2229 # Ensure our test decoder won't interfere with subsequent tests.
2230 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002231 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002234 data = "1234567890"
2235 tests = ("utf-16",
2236 "utf-16-le",
2237 "utf-16-be",
2238 "utf-32",
2239 "utf-32-le",
2240 "utf-32-be")
2241 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242 buf = self.BytesIO()
2243 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002244 # Check if the BOM is written only once (see issue1753).
2245 f.write(data)
2246 f.write(data)
2247 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002248 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002249 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002250 self.assertEqual(f.read(), data * 2)
2251 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002252
Benjamin Petersona1b49012009-03-31 23:11:32 +00002253 def test_unreadable(self):
2254 class UnReadable(self.BytesIO):
2255 def readable(self):
2256 return False
2257 txt = self.TextIOWrapper(UnReadable())
2258 self.assertRaises(IOError, txt.read)
2259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 def test_read_one_by_one(self):
2261 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002262 reads = ""
2263 while True:
2264 c = txt.read(1)
2265 if not c:
2266 break
2267 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002270 def test_readlines(self):
2271 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2272 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2273 txt.seek(0)
2274 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2275 txt.seek(0)
2276 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2277
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002278 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002280 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002282 reads = ""
2283 while True:
2284 c = txt.read(128)
2285 if not c:
2286 break
2287 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002288 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002290 def test_writelines(self):
2291 l = ['ab', 'cd', 'ef']
2292 buf = self.BytesIO()
2293 txt = self.TextIOWrapper(buf)
2294 txt.writelines(l)
2295 txt.flush()
2296 self.assertEqual(buf.getvalue(), b'abcdef')
2297
2298 def test_writelines_userlist(self):
2299 l = UserList(['ab', 'cd', 'ef'])
2300 buf = self.BytesIO()
2301 txt = self.TextIOWrapper(buf)
2302 txt.writelines(l)
2303 txt.flush()
2304 self.assertEqual(buf.getvalue(), b'abcdef')
2305
2306 def test_writelines_error(self):
2307 txt = self.TextIOWrapper(self.BytesIO())
2308 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2309 self.assertRaises(TypeError, txt.writelines, None)
2310 self.assertRaises(TypeError, txt.writelines, b'abc')
2311
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002312 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002313 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002314
2315 # read one char at a time
2316 reads = ""
2317 while True:
2318 c = txt.read(1)
2319 if not c:
2320 break
2321 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002322 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002323
2324 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002326 txt._CHUNK_SIZE = 4
2327
2328 reads = ""
2329 while True:
2330 c = txt.read(4)
2331 if not c:
2332 break
2333 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002335
2336 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002338 txt._CHUNK_SIZE = 4
2339
2340 reads = txt.read(4)
2341 reads += txt.read(4)
2342 reads += txt.readline()
2343 reads += txt.readline()
2344 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002345 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346
2347 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002349 txt._CHUNK_SIZE = 4
2350
2351 reads = txt.read(4)
2352 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002353 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002354
2355 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002357 txt._CHUNK_SIZE = 4
2358
2359 reads = txt.read(4)
2360 pos = txt.tell()
2361 txt.seek(0)
2362 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002363 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002364
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002365 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002366 buffer = self.BytesIO(self.testdata)
2367 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002368
2369 self.assertEqual(buffer.seekable(), txt.seekable())
2370
Antoine Pitroue4501852009-05-14 18:55:55 +00002371 def test_append_bom(self):
2372 # The BOM is not written again when appending to a non-empty file
2373 filename = support.TESTFN
2374 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2375 with self.open(filename, 'w', encoding=charset) as f:
2376 f.write('aaa')
2377 pos = f.tell()
2378 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002379 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002380
2381 with self.open(filename, 'a', encoding=charset) as f:
2382 f.write('xxx')
2383 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002385
2386 def test_seek_bom(self):
2387 # Same test, but when seeking manually
2388 filename = support.TESTFN
2389 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2390 with self.open(filename, 'w', encoding=charset) as f:
2391 f.write('aaa')
2392 pos = f.tell()
2393 with self.open(filename, 'r+', encoding=charset) as f:
2394 f.seek(pos)
2395 f.write('zzz')
2396 f.seek(0)
2397 f.write('bbb')
2398 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002399 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002400
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002401 def test_errors_property(self):
2402 with self.open(support.TESTFN, "w") as f:
2403 self.assertEqual(f.errors, "strict")
2404 with self.open(support.TESTFN, "w", errors="replace") as f:
2405 self.assertEqual(f.errors, "replace")
2406
Victor Stinner45df8202010-04-28 22:31:17 +00002407 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002408 def test_threads_write(self):
2409 # Issue6750: concurrent writes could duplicate data
2410 event = threading.Event()
2411 with self.open(support.TESTFN, "w", buffering=1) as f:
2412 def run(n):
2413 text = "Thread%03d\n" % n
2414 event.wait()
2415 f.write(text)
2416 threads = [threading.Thread(target=lambda n=x: run(n))
2417 for x in range(20)]
2418 for t in threads:
2419 t.start()
2420 time.sleep(0.02)
2421 event.set()
2422 for t in threads:
2423 t.join()
2424 with self.open(support.TESTFN) as f:
2425 content = f.read()
2426 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002428
Antoine Pitrou6be88762010-05-03 16:48:20 +00002429 def test_flush_error_on_close(self):
2430 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2431 def bad_flush():
2432 raise IOError()
2433 txt.flush = bad_flush
2434 self.assertRaises(IOError, txt.close) # exception not swallowed
2435
2436 def test_multi_close(self):
2437 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2438 txt.close()
2439 txt.close()
2440 txt.close()
2441 self.assertRaises(ValueError, txt.flush)
2442
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002443 def test_unseekable(self):
2444 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2445 self.assertRaises(self.UnsupportedOperation, txt.tell)
2446 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2447
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002448 def test_readonly_attributes(self):
2449 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2450 buf = self.BytesIO(self.testdata)
2451 with self.assertRaises(AttributeError):
2452 txt.buffer = buf
2453
Antoine Pitroue96ec682011-07-23 21:46:35 +02002454 def test_rawio(self):
2455 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2456 # that subprocess.Popen() can have the required unbuffered
2457 # semantics with universal_newlines=True.
2458 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2459 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2460 # Reads
2461 self.assertEqual(txt.read(4), 'abcd')
2462 self.assertEqual(txt.readline(), 'efghi\n')
2463 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2464
2465 def test_rawio_write_through(self):
2466 # Issue #12591: with write_through=True, writes don't need a flush
2467 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2468 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2469 write_through=True)
2470 txt.write('1')
2471 txt.write('23\n4')
2472 txt.write('5')
2473 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475class CTextIOWrapperTest(TextIOWrapperTest):
2476
2477 def test_initialization(self):
2478 r = self.BytesIO(b"\xc3\xa9\n\n")
2479 b = self.BufferedReader(r, 1000)
2480 t = self.TextIOWrapper(b)
2481 self.assertRaises(TypeError, t.__init__, b, newline=42)
2482 self.assertRaises(ValueError, t.read)
2483 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2484 self.assertRaises(ValueError, t.read)
2485
2486 def test_garbage_collection(self):
2487 # C TextIOWrapper objects are collected, and collecting them flushes
2488 # all data to disk.
2489 # The Python version has __del__, so it ends in gc.garbage instead.
2490 rawio = io.FileIO(support.TESTFN, "wb")
2491 b = self.BufferedWriter(rawio)
2492 t = self.TextIOWrapper(b, encoding="ascii")
2493 t.write("456def")
2494 t.x = t
2495 wr = weakref.ref(t)
2496 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002497 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002498 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002499 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 self.assertEqual(f.read(), b"456def")
2501
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002502 def test_rwpair_cleared_before_textio(self):
2503 # Issue 13070: TextIOWrapper's finalization would crash when called
2504 # after the reference to the underlying BufferedRWPair's writer got
2505 # cleared by the GC.
2506 for i in range(1000):
2507 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2508 t1 = self.TextIOWrapper(b1, encoding="ascii")
2509 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2510 t2 = self.TextIOWrapper(b2, encoding="ascii")
2511 # circular references
2512 t1.buddy = t2
2513 t2.buddy = t1
2514 support.gc_collect()
2515
2516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002517class PyTextIOWrapperTest(TextIOWrapperTest):
2518 pass
2519
2520
2521class IncrementalNewlineDecoderTest(unittest.TestCase):
2522
2523 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002524 # UTF-8 specific tests for a newline decoder
2525 def _check_decode(b, s, **kwargs):
2526 # We exercise getstate() / setstate() as well as decode()
2527 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002528 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002529 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002530 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002531
Antoine Pitrou180a3362008-12-14 16:36:46 +00002532 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002533
Antoine Pitrou180a3362008-12-14 16:36:46 +00002534 _check_decode(b'\xe8', "")
2535 _check_decode(b'\xa2', "")
2536 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002537
Antoine Pitrou180a3362008-12-14 16:36:46 +00002538 _check_decode(b'\xe8', "")
2539 _check_decode(b'\xa2', "")
2540 _check_decode(b'\x88', "\u8888")
2541
2542 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002543 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2544
Antoine Pitrou180a3362008-12-14 16:36:46 +00002545 decoder.reset()
2546 _check_decode(b'\n', "\n")
2547 _check_decode(b'\r', "")
2548 _check_decode(b'', "\n", final=True)
2549 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002550
Antoine Pitrou180a3362008-12-14 16:36:46 +00002551 _check_decode(b'\r', "")
2552 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002553
Antoine Pitrou180a3362008-12-14 16:36:46 +00002554 _check_decode(b'\r\r\n', "\n\n")
2555 _check_decode(b'\r', "")
2556 _check_decode(b'\r', "\n")
2557 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002558
Antoine Pitrou180a3362008-12-14 16:36:46 +00002559 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2560 _check_decode(b'\xe8\xa2\x88', "\u8888")
2561 _check_decode(b'\n', "\n")
2562 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2563 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002566 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002567 if encoding is not None:
2568 encoder = codecs.getincrementalencoder(encoding)()
2569 def _decode_bytewise(s):
2570 # Decode one byte at a time
2571 for b in encoder.encode(s):
2572 result.append(decoder.decode(bytes([b])))
2573 else:
2574 encoder = None
2575 def _decode_bytewise(s):
2576 # Decode one char at a time
2577 for c in s:
2578 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002579 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002580 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002581 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002582 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002583 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002584 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002585 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002586 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002587 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002588 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002589 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002590 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002591 input = "abc"
2592 if encoder is not None:
2593 encoder.reset()
2594 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(decoder.decode(input), "abc")
2596 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002597
2598 def test_newline_decoder(self):
2599 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 # None meaning the IncrementalNewlineDecoder takes unicode input
2601 # rather than bytes input
2602 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002603 'utf-16', 'utf-16-le', 'utf-16-be',
2604 'utf-32', 'utf-32-le', 'utf-32-be',
2605 )
2606 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002607 decoder = enc and codecs.getincrementaldecoder(enc)()
2608 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2609 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002610 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002611 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2612 self.check_newline_decoding_utf8(decoder)
2613
Antoine Pitrou66913e22009-03-06 23:40:56 +00002614 def test_newline_bytes(self):
2615 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2616 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002617 self.assertEqual(dec.newlines, None)
2618 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2619 self.assertEqual(dec.newlines, None)
2620 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2621 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002622 dec = self.IncrementalNewlineDecoder(None, translate=False)
2623 _check(dec)
2624 dec = self.IncrementalNewlineDecoder(None, translate=True)
2625 _check(dec)
2626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2628 pass
2629
2630class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2631 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002632
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002633
Guido van Rossum01a27522007-03-07 01:00:12 +00002634# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002635
Guido van Rossum5abbf752007-08-27 17:39:33 +00002636class MiscIOTest(unittest.TestCase):
2637
Barry Warsaw40e82462008-11-20 20:14:50 +00002638 def tearDown(self):
2639 support.unlink(support.TESTFN)
2640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 def test___all__(self):
2642 for name in self.io.__all__:
2643 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002644 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002645 if name == "open":
2646 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002647 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002648 self.assertTrue(issubclass(obj, Exception), name)
2649 elif not name.startswith("SEEK_"):
2650 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002651
Barry Warsaw40e82462008-11-20 20:14:50 +00002652 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002653 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002654 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002655 f.close()
2656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002657 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual(f.name, support.TESTFN)
2659 self.assertEqual(f.buffer.name, support.TESTFN)
2660 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2661 self.assertEqual(f.mode, "U")
2662 self.assertEqual(f.buffer.mode, "rb")
2663 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002664 f.close()
2665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002667 self.assertEqual(f.mode, "w+")
2668 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2669 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002671 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(g.mode, "wb")
2673 self.assertEqual(g.raw.mode, "wb")
2674 self.assertEqual(g.name, f.fileno())
2675 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002676 f.close()
2677 g.close()
2678
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002679 def test_io_after_close(self):
2680 for kwargs in [
2681 {"mode": "w"},
2682 {"mode": "wb"},
2683 {"mode": "w", "buffering": 1},
2684 {"mode": "w", "buffering": 2},
2685 {"mode": "wb", "buffering": 0},
2686 {"mode": "r"},
2687 {"mode": "rb"},
2688 {"mode": "r", "buffering": 1},
2689 {"mode": "r", "buffering": 2},
2690 {"mode": "rb", "buffering": 0},
2691 {"mode": "w+"},
2692 {"mode": "w+b"},
2693 {"mode": "w+", "buffering": 1},
2694 {"mode": "w+", "buffering": 2},
2695 {"mode": "w+b", "buffering": 0},
2696 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002698 f.close()
2699 self.assertRaises(ValueError, f.flush)
2700 self.assertRaises(ValueError, f.fileno)
2701 self.assertRaises(ValueError, f.isatty)
2702 self.assertRaises(ValueError, f.__iter__)
2703 if hasattr(f, "peek"):
2704 self.assertRaises(ValueError, f.peek, 1)
2705 self.assertRaises(ValueError, f.read)
2706 if hasattr(f, "read1"):
2707 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002708 if hasattr(f, "readall"):
2709 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002710 if hasattr(f, "readinto"):
2711 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2712 self.assertRaises(ValueError, f.readline)
2713 self.assertRaises(ValueError, f.readlines)
2714 self.assertRaises(ValueError, f.seek, 0)
2715 self.assertRaises(ValueError, f.tell)
2716 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002717 self.assertRaises(ValueError, f.write,
2718 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002719 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002720 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_blockingioerror(self):
2723 # Various BlockingIOError issues
2724 self.assertRaises(TypeError, self.BlockingIOError)
2725 self.assertRaises(TypeError, self.BlockingIOError, 1)
2726 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2727 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2728 b = self.BlockingIOError(1, "")
2729 self.assertEqual(b.characters_written, 0)
2730 class C(str):
2731 pass
2732 c = C("")
2733 b = self.BlockingIOError(1, c)
2734 c.b = b
2735 b.c = c
2736 wr = weakref.ref(c)
2737 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002738 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002739 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740
2741 def test_abcs(self):
2742 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002743 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2744 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2745 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2746 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747
2748 def _check_abc_inheritance(self, abcmodule):
2749 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002750 self.assertIsInstance(f, abcmodule.IOBase)
2751 self.assertIsInstance(f, abcmodule.RawIOBase)
2752 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2753 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002755 self.assertIsInstance(f, abcmodule.IOBase)
2756 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2757 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2758 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002759 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002760 self.assertIsInstance(f, abcmodule.IOBase)
2761 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2762 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2763 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764
2765 def test_abc_inheritance(self):
2766 # Test implementations inherit from their respective ABCs
2767 self._check_abc_inheritance(self)
2768
2769 def test_abc_inheritance_official(self):
2770 # Test implementations inherit from the official ABCs of the
2771 # baseline "io" module.
2772 self._check_abc_inheritance(io)
2773
Antoine Pitroue033e062010-10-29 10:38:18 +00002774 def _check_warn_on_dealloc(self, *args, **kwargs):
2775 f = open(*args, **kwargs)
2776 r = repr(f)
2777 with self.assertWarns(ResourceWarning) as cm:
2778 f = None
2779 support.gc_collect()
2780 self.assertIn(r, str(cm.warning.args[0]))
2781
2782 def test_warn_on_dealloc(self):
2783 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2784 self._check_warn_on_dealloc(support.TESTFN, "wb")
2785 self._check_warn_on_dealloc(support.TESTFN, "w")
2786
2787 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2788 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002789 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002790 for fd in fds:
2791 try:
2792 os.close(fd)
2793 except EnvironmentError as e:
2794 if e.errno != errno.EBADF:
2795 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002796 self.addCleanup(cleanup_fds)
2797 r, w = os.pipe()
2798 fds += r, w
2799 self._check_warn_on_dealloc(r, *args, **kwargs)
2800 # When using closefd=False, there's no warning
2801 r, w = os.pipe()
2802 fds += r, w
2803 with warnings.catch_warnings(record=True) as recorded:
2804 open(r, *args, closefd=False, **kwargs)
2805 support.gc_collect()
2806 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002807
2808 def test_warn_on_dealloc_fd(self):
2809 self._check_warn_on_dealloc_fd("rb", buffering=0)
2810 self._check_warn_on_dealloc_fd("rb")
2811 self._check_warn_on_dealloc_fd("r")
2812
2813
Antoine Pitrou243757e2010-11-05 21:15:39 +00002814 def test_pickling(self):
2815 # Pickling file objects is forbidden
2816 for kwargs in [
2817 {"mode": "w"},
2818 {"mode": "wb"},
2819 {"mode": "wb", "buffering": 0},
2820 {"mode": "r"},
2821 {"mode": "rb"},
2822 {"mode": "rb", "buffering": 0},
2823 {"mode": "w+"},
2824 {"mode": "w+b"},
2825 {"mode": "w+b", "buffering": 0},
2826 ]:
2827 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2828 with self.open(support.TESTFN, **kwargs) as f:
2829 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2830
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002831 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2832 def test_nonblock_pipe_write_bigbuf(self):
2833 self._test_nonblock_pipe_write(16*1024)
2834
2835 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2836 def test_nonblock_pipe_write_smallbuf(self):
2837 self._test_nonblock_pipe_write(1024)
2838
2839 def _set_non_blocking(self, fd):
2840 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2841 self.assertNotEqual(flags, -1)
2842 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2843 self.assertEqual(res, 0)
2844
2845 def _test_nonblock_pipe_write(self, bufsize):
2846 sent = []
2847 received = []
2848 r, w = os.pipe()
2849 self._set_non_blocking(r)
2850 self._set_non_blocking(w)
2851
2852 # To exercise all code paths in the C implementation we need
2853 # to play with buffer sizes. For instance, if we choose a
2854 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2855 # then we will never get a partial write of the buffer.
2856 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2857 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2858
2859 with rf, wf:
2860 for N in 9999, 73, 7574:
2861 try:
2862 i = 0
2863 while True:
2864 msg = bytes([i % 26 + 97]) * N
2865 sent.append(msg)
2866 wf.write(msg)
2867 i += 1
2868
2869 except self.BlockingIOError as e:
2870 self.assertEqual(e.args[0], errno.EAGAIN)
2871 sent[-1] = sent[-1][:e.characters_written]
2872 received.append(rf.read())
2873 msg = b'BLOCKED'
2874 wf.write(msg)
2875 sent.append(msg)
2876
2877 while True:
2878 try:
2879 wf.flush()
2880 break
2881 except self.BlockingIOError as e:
2882 self.assertEqual(e.args[0], errno.EAGAIN)
2883 self.assertEqual(e.characters_written, 0)
2884 received.append(rf.read())
2885
2886 received += iter(rf.read, None)
2887
2888 sent, received = b''.join(sent), b''.join(received)
2889 self.assertTrue(sent == received)
2890 self.assertTrue(wf.closed)
2891 self.assertTrue(rf.closed)
2892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893class CMiscIOTest(MiscIOTest):
2894 io = io
2895
2896class PyMiscIOTest(MiscIOTest):
2897 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002898
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002899
2900@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2901class SignalsTest(unittest.TestCase):
2902
2903 def setUp(self):
2904 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2905
2906 def tearDown(self):
2907 signal.signal(signal.SIGALRM, self.oldalrm)
2908
2909 def alarm_interrupt(self, sig, frame):
2910 1/0
2911
2912 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002913 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2914 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002915 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2916 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002917 invokes the signal handler, and bubbles up the exception raised
2918 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002919 read_results = []
2920 def _read():
2921 s = os.read(r, 1)
2922 read_results.append(s)
2923 t = threading.Thread(target=_read)
2924 t.daemon = True
2925 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002926 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002927 try:
2928 wio = self.io.open(w, **fdopen_kwargs)
2929 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002930 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002931 # Fill the pipe enough that the write will be blocking.
2932 # It will be interrupted by the timer armed above. Since the
2933 # other thread has read one byte, the low-level write will
2934 # return with a successful (partial) result rather than an EINTR.
2935 # The buffered IO layer must check for pending signal
2936 # handlers, which in this case will invoke alarm_interrupt().
2937 self.assertRaises(ZeroDivisionError,
2938 wio.write, item * (1024 * 1024))
2939 t.join()
2940 # We got one byte, get another one and check that it isn't a
2941 # repeat of the first one.
2942 read_results.append(os.read(r, 1))
2943 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2944 finally:
2945 os.close(w)
2946 os.close(r)
2947 # This is deliberate. If we didn't close the file descriptor
2948 # before closing wio, wio would try to flush its internal
2949 # buffer, and block again.
2950 try:
2951 wio.close()
2952 except IOError as e:
2953 if e.errno != errno.EBADF:
2954 raise
2955
2956 def test_interrupted_write_unbuffered(self):
2957 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2958
2959 def test_interrupted_write_buffered(self):
2960 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2961
2962 def test_interrupted_write_text(self):
2963 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2964
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002965 def check_reentrant_write(self, data, **fdopen_kwargs):
2966 def on_alarm(*args):
2967 # Will be called reentrantly from the same thread
2968 wio.write(data)
2969 1/0
2970 signal.signal(signal.SIGALRM, on_alarm)
2971 r, w = os.pipe()
2972 wio = self.io.open(w, **fdopen_kwargs)
2973 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002974 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002975 # Either the reentrant call to wio.write() fails with RuntimeError,
2976 # or the signal handler raises ZeroDivisionError.
2977 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2978 while 1:
2979 for i in range(100):
2980 wio.write(data)
2981 wio.flush()
2982 # Make sure the buffer doesn't fill up and block further writes
2983 os.read(r, len(data) * 100)
2984 exc = cm.exception
2985 if isinstance(exc, RuntimeError):
2986 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2987 finally:
2988 wio.close()
2989 os.close(r)
2990
2991 def test_reentrant_write_buffered(self):
2992 self.check_reentrant_write(b"xy", mode="wb")
2993
2994 def test_reentrant_write_text(self):
2995 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2996
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002997 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2998 """Check that a buffered read, when it gets interrupted (either
2999 returning a partial result or EINTR), properly invokes the signal
3000 handler and retries if the latter returned successfully."""
3001 r, w = os.pipe()
3002 fdopen_kwargs["closefd"] = False
3003 def alarm_handler(sig, frame):
3004 os.write(w, b"bar")
3005 signal.signal(signal.SIGALRM, alarm_handler)
3006 try:
3007 rio = self.io.open(r, **fdopen_kwargs)
3008 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003009 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003010 # Expected behaviour:
3011 # - first raw read() returns partial b"foo"
3012 # - second raw read() returns EINTR
3013 # - third raw read() returns b"bar"
3014 self.assertEqual(decode(rio.read(6)), "foobar")
3015 finally:
3016 rio.close()
3017 os.close(w)
3018 os.close(r)
3019
Antoine Pitrou20db5112011-08-19 20:32:34 +02003020 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003021 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3022 mode="rb")
3023
Antoine Pitrou20db5112011-08-19 20:32:34 +02003024 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003025 self.check_interrupted_read_retry(lambda x: x,
3026 mode="r")
3027
3028 @unittest.skipUnless(threading, 'Threading required for this test.')
3029 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3030 """Check that a buffered write, when it gets interrupted (either
3031 returning a partial result or EINTR), properly invokes the signal
3032 handler and retries if the latter returned successfully."""
3033 select = support.import_module("select")
3034 # A quantity that exceeds the buffer size of an anonymous pipe's
3035 # write end.
3036 N = 1024 * 1024
3037 r, w = os.pipe()
3038 fdopen_kwargs["closefd"] = False
3039 # We need a separate thread to read from the pipe and allow the
3040 # write() to finish. This thread is started after the SIGALRM is
3041 # received (forcing a first EINTR in write()).
3042 read_results = []
3043 write_finished = False
3044 def _read():
3045 while not write_finished:
3046 while r in select.select([r], [], [], 1.0)[0]:
3047 s = os.read(r, 1024)
3048 read_results.append(s)
3049 t = threading.Thread(target=_read)
3050 t.daemon = True
3051 def alarm1(sig, frame):
3052 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003053 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003054 def alarm2(sig, frame):
3055 t.start()
3056 signal.signal(signal.SIGALRM, alarm1)
3057 try:
3058 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003059 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003060 # Expected behaviour:
3061 # - first raw write() is partial (because of the limited pipe buffer
3062 # and the first alarm)
3063 # - second raw write() returns EINTR (because of the second alarm)
3064 # - subsequent write()s are successful (either partial or complete)
3065 self.assertEqual(N, wio.write(item * N))
3066 wio.flush()
3067 write_finished = True
3068 t.join()
3069 self.assertEqual(N, sum(len(x) for x in read_results))
3070 finally:
3071 write_finished = True
3072 os.close(w)
3073 os.close(r)
3074 # This is deliberate. If we didn't close the file descriptor
3075 # before closing wio, wio would try to flush its internal
3076 # buffer, and could block (in case of failure).
3077 try:
3078 wio.close()
3079 except IOError as e:
3080 if e.errno != errno.EBADF:
3081 raise
3082
Antoine Pitrou20db5112011-08-19 20:32:34 +02003083 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003084 self.check_interrupted_write_retry(b"x", mode="wb")
3085
Antoine Pitrou20db5112011-08-19 20:32:34 +02003086 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003087 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3088
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003089
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003090class CSignalsTest(SignalsTest):
3091 io = io
3092
3093class PySignalsTest(SignalsTest):
3094 io = pyio
3095
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003096 # Handling reentrancy issues would slow down _pyio even more, so the
3097 # tests are disabled.
3098 test_reentrant_write_buffered = None
3099 test_reentrant_write_text = None
3100
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003101
Guido van Rossum28524c72007-02-27 05:47:44 +00003102def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 tests = (CIOTest, PyIOTest,
3104 CBufferedReaderTest, PyBufferedReaderTest,
3105 CBufferedWriterTest, PyBufferedWriterTest,
3106 CBufferedRWPairTest, PyBufferedRWPairTest,
3107 CBufferedRandomTest, PyBufferedRandomTest,
3108 StatefulIncrementalDecoderTest,
3109 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3110 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003111 CMiscIOTest, PyMiscIOTest,
3112 CSignalsTest, PySignalsTest,
3113 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003114
3115 # Put the namespaces of the IO module we are testing and some useful mock
3116 # classes in the __dict__ of each test.
3117 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003118 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003119 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3120 c_io_ns = {name : getattr(io, name) for name in all_members}
3121 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3122 globs = globals()
3123 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3124 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3125 # Avoid turning open into a bound method.
3126 py_io_ns["open"] = pyio.OpenWrapper
3127 for test in tests:
3128 if test.__name__.startswith("C"):
3129 for name, obj in c_io_ns.items():
3130 setattr(test, name, obj)
3131 elif test.__name__.startswith("Py"):
3132 for name, obj in py_io_ns.items():
3133 setattr(test, name, obj)
3134
3135 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003136
3137if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003138 test_main()