blob: df7ca15f58fe06a0070f4434f8807b864654d9eb [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000367 with self.open(support.TESTFN, "wb", buffering=0) as f:
368 self.assertEqual(f.readable(), False)
369 self.assertEqual(f.writable(), True)
370 self.assertEqual(f.seekable(), True)
371 self.write_ops(f)
372 with self.open(support.TESTFN, "rb", buffering=0) as f:
373 self.assertEqual(f.readable(), True)
374 self.assertEqual(f.writable(), False)
375 self.assertEqual(f.seekable(), True)
376 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000377
Guido van Rossum87429772007-04-10 21:06:59 +0000378 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000379 with self.open(support.TESTFN, "wb") as f:
380 self.assertEqual(f.readable(), False)
381 self.assertEqual(f.writable(), True)
382 self.assertEqual(f.seekable(), True)
383 self.write_ops(f)
384 with self.open(support.TESTFN, "rb") as f:
385 self.assertEqual(f.readable(), True)
386 self.assertEqual(f.writable(), False)
387 self.assertEqual(f.seekable(), True)
388 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000389
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000390 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000391 with self.open(support.TESTFN, "wb") as f:
392 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
393 with self.open(support.TESTFN, "rb") as f:
394 self.assertEqual(f.readline(), b"abc\n")
395 self.assertEqual(f.readline(10), b"def\n")
396 self.assertEqual(f.readline(2), b"xy")
397 self.assertEqual(f.readline(4), b"zzy\n")
398 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000399 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000400 self.assertRaises(TypeError, f.readline, 5.3)
401 with self.open(support.TESTFN, "r") as f:
402 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000403
Guido van Rossum28524c72007-02-27 05:47:44 +0000404 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000406 self.write_ops(f)
407 data = f.getvalue()
408 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000410 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000411
Guido van Rossum53807da2007-04-10 19:01:47 +0000412 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000413 # On Windows and Mac OSX this test comsumes large resources; It takes
414 # a long time to build the >2GB file and takes >2GB of disk space
415 # therefore the resource must be enabled to run this test.
416 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000417 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 print("\nTesting large file ops skipped on %s." % sys.platform,
419 file=sys.stderr)
420 print("It requires %d bytes and a long time." % self.LARGE,
421 file=sys.stderr)
422 print("Use 'regrtest.py -u largefile test_io' to run it.",
423 file=sys.stderr)
424 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000425 with self.open(support.TESTFN, "w+b", 0) as f:
426 self.large_file_ops(f)
427 with self.open(support.TESTFN, "w+b") as f:
428 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000429
430 def test_with_open(self):
431 for bufsize in (0, 1, 100):
432 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000433 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000434 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000435 self.assertEqual(f.closed, True)
436 f = None
437 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000439 1/0
440 except ZeroDivisionError:
441 self.assertEqual(f.closed, True)
442 else:
443 self.fail("1/0 didn't raise an exception")
444
Antoine Pitrou08838b62009-01-21 00:55:13 +0000445 # issue 5008
446 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000454 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455
Guido van Rossum87429772007-04-10 21:06:59 +0000456 def test_destructor(self):
457 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def __del__(self):
460 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 try:
462 f = super().__del__
463 except AttributeError:
464 pass
465 else:
466 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000467 def close(self):
468 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000469 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def flush(self):
471 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000473 with support.check_warnings(('', ResourceWarning)):
474 f = MyFileIO(support.TESTFN, "wb")
475 f.write(b"xxx")
476 del f
477 support.gc_collect()
478 self.assertEqual(record, [1, 2, 3])
479 with self.open(support.TESTFN, "rb") as f:
480 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481
482 def _check_base_destructor(self, base):
483 record = []
484 class MyIO(base):
485 def __init__(self):
486 # This exercises the availability of attributes on object
487 # destruction.
488 # (in the C version, close() is called by the tp_dealloc
489 # function, not by __del__)
490 self.on_del = 1
491 self.on_close = 2
492 self.on_flush = 3
493 def __del__(self):
494 record.append(self.on_del)
495 try:
496 f = super().__del__
497 except AttributeError:
498 pass
499 else:
500 f()
501 def close(self):
502 record.append(self.on_close)
503 super().close()
504 def flush(self):
505 record.append(self.on_flush)
506 super().flush()
507 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000509 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 self.assertEqual(record, [1, 2, 3])
511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000512 def test_IOBase_destructor(self):
513 self._check_base_destructor(self.IOBase)
514
515 def test_RawIOBase_destructor(self):
516 self._check_base_destructor(self.RawIOBase)
517
518 def test_BufferedIOBase_destructor(self):
519 self._check_base_destructor(self.BufferedIOBase)
520
521 def test_TextIOBase_destructor(self):
522 self._check_base_destructor(self.TextIOBase)
523
Guido van Rossum87429772007-04-10 21:06:59 +0000524 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb") as f:
526 f.write(b"xxx")
527 with self.open(support.TESTFN, "rb") as f:
528 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000529
Guido van Rossumd4103952007-04-12 05:44:49 +0000530 def test_array_writes(self):
531 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000532 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb", 0) as f:
534 self.assertEqual(f.write(a), n)
535 with self.open(support.TESTFN, "wb") as f:
536 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000537
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542 def test_read_closed(self):
543 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000544 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 with self.open(support.TESTFN, "r") as f:
546 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 self.assertEqual(file.read(), "egg\n")
548 file.seek(0)
549 file.close()
550 self.assertRaises(ValueError, file.read)
551
552 def test_no_closefd_with_filename(self):
553 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000554 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000555
556 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(file.buffer.raw.closefd, False)
563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 def test_garbage_collection(self):
565 # FileIO objects are collected, and collecting them flushes
566 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000567 with support.check_warnings(('', ResourceWarning)):
568 f = self.FileIO(support.TESTFN, "wb")
569 f.write(b"abcxxx")
570 f.f = f
571 wr = weakref.ref(f)
572 del f
573 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000574 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000576 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000577
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000578 def test_unbounded_file(self):
579 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
580 zero = "/dev/zero"
581 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
593
Antoine Pitrou6be88762010-05-03 16:48:20 +0000594 def test_flush_error_on_close(self):
595 f = self.open(support.TESTFN, "wb", buffering=0)
596 def bad_flush():
597 raise IOError()
598 f.flush = bad_flush
599 self.assertRaises(IOError, f.close) # exception not swallowed
600
601 def test_multi_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 f.close()
604 f.close()
605 f.close()
606 self.assertRaises(ValueError, f.flush)
607
Antoine Pitrou328ec742010-09-14 18:37:24 +0000608 def test_RawIOBase_read(self):
609 # Exercise the default RawIOBase.read() implementation (which calls
610 # readinto() internally).
611 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
612 self.assertEqual(rawio.read(2), b"ab")
613 self.assertEqual(rawio.read(2), b"c")
614 self.assertEqual(rawio.read(2), b"d")
615 self.assertEqual(rawio.read(2), None)
616 self.assertEqual(rawio.read(2), b"ef")
617 self.assertEqual(rawio.read(2), b"g")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"")
620
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400621 def test_types_have_dict(self):
622 test = (
623 self.IOBase(),
624 self.RawIOBase(),
625 self.TextIOBase(),
626 self.StringIO(),
627 self.BytesIO()
628 )
629 for obj in test:
630 self.assertTrue(hasattr(obj, "__dict__"))
631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200633
634 def test_IOBase_finalize(self):
635 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
636 # class which inherits IOBase and an object of this class are caught
637 # in a reference cycle and close() is already in the method cache.
638 class MyIO(self.IOBase):
639 def close(self):
640 pass
641
642 # create an instance to populate the method cache
643 MyIO()
644 obj = MyIO()
645 obj.obj = obj
646 wr = weakref.ref(obj)
647 del MyIO
648 del obj
649 support.gc_collect()
650 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000652class PyIOTest(IOTest):
653 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000654
Guido van Rossuma9e20242007-03-08 00:43:48 +0000655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656class CommonBufferedTests:
657 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
658
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000659 def test_detach(self):
660 raw = self.MockRawIO()
661 buf = self.tp(raw)
662 self.assertIs(buf.detach(), raw)
663 self.assertRaises(ValueError, buf.detach)
664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_fileno(self):
666 rawio = self.MockRawIO()
667 bufio = self.tp(rawio)
668
Ezio Melottib3aedd42010-11-20 19:04:17 +0000669 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670
671 def test_no_fileno(self):
672 # XXX will we always have fileno() function? If so, kill
673 # this test. Else, write it.
674 pass
675
676 def test_invalid_args(self):
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 # Invalid whence
680 self.assertRaises(ValueError, bufio.seek, 0, -1)
681 self.assertRaises(ValueError, bufio.seek, 0, 3)
682
683 def test_override_destructor(self):
684 tp = self.tp
685 record = []
686 class MyBufferedIO(tp):
687 def __del__(self):
688 record.append(1)
689 try:
690 f = super().__del__
691 except AttributeError:
692 pass
693 else:
694 f()
695 def close(self):
696 record.append(2)
697 super().close()
698 def flush(self):
699 record.append(3)
700 super().flush()
701 rawio = self.MockRawIO()
702 bufio = MyBufferedIO(rawio)
703 writable = bufio.writable()
704 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000705 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 if writable:
707 self.assertEqual(record, [1, 2, 3])
708 else:
709 self.assertEqual(record, [1, 2])
710
711 def test_context_manager(self):
712 # Test usability as a context manager
713 rawio = self.MockRawIO()
714 bufio = self.tp(rawio)
715 def _with():
716 with bufio:
717 pass
718 _with()
719 # bufio should now be closed, and using it a second time should raise
720 # a ValueError.
721 self.assertRaises(ValueError, _with)
722
723 def test_error_through_destructor(self):
724 # Test that the exception state is not modified by a destructor,
725 # even if close() fails.
726 rawio = self.CloseFailureIO()
727 def f():
728 self.tp(rawio).xyzzy
729 with support.captured_output("stderr") as s:
730 self.assertRaises(AttributeError, f)
731 s = s.getvalue().strip()
732 if s:
733 # The destructor *may* have printed an unraisable error, check it
734 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000735 self.assertTrue(s.startswith("Exception IOError: "), s)
736 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000737
Antoine Pitrou716c4442009-05-23 19:04:03 +0000738 def test_repr(self):
739 raw = self.MockRawIO()
740 b = self.tp(raw)
741 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
742 self.assertEqual(repr(b), "<%s>" % clsname)
743 raw.name = "dummy"
744 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
745 raw.name = b"dummy"
746 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
747
Antoine Pitrou6be88762010-05-03 16:48:20 +0000748 def test_flush_error_on_close(self):
749 raw = self.MockRawIO()
750 def bad_flush():
751 raise IOError()
752 raw.flush = bad_flush
753 b = self.tp(raw)
754 self.assertRaises(IOError, b.close) # exception not swallowed
755
756 def test_multi_close(self):
757 raw = self.MockRawIO()
758 b = self.tp(raw)
759 b.close()
760 b.close()
761 b.close()
762 self.assertRaises(ValueError, b.flush)
763
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000764 def test_unseekable(self):
765 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
766 self.assertRaises(self.UnsupportedOperation, bufio.tell)
767 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
768
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000769 def test_readonly_attributes(self):
770 raw = self.MockRawIO()
771 buf = self.tp(raw)
772 x = self.MockRawIO()
773 with self.assertRaises(AttributeError):
774 buf.raw = x
775
Guido van Rossum78892e42007-04-06 17:31:18 +0000776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000777class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
778 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 def test_constructor(self):
781 rawio = self.MockRawIO([b"abc"])
782 bufio = self.tp(rawio)
783 bufio.__init__(rawio)
784 bufio.__init__(rawio, buffer_size=1024)
785 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000786 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000787 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
788 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
789 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
790 rawio = self.MockRawIO([b"abc"])
791 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000792 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000795 for arg in (None, 7):
796 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
797 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000798 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799 # Invalid args
800 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000802 def test_read1(self):
803 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
804 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000805 self.assertEqual(b"a", bufio.read(1))
806 self.assertEqual(b"b", bufio.read1(1))
807 self.assertEqual(rawio._reads, 1)
808 self.assertEqual(b"c", bufio.read1(100))
809 self.assertEqual(rawio._reads, 1)
810 self.assertEqual(b"d", bufio.read1(100))
811 self.assertEqual(rawio._reads, 2)
812 self.assertEqual(b"efg", bufio.read1(100))
813 self.assertEqual(rawio._reads, 3)
814 self.assertEqual(b"", bufio.read1(100))
815 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 # Invalid args
817 self.assertRaises(ValueError, bufio.read1, -1)
818
819 def test_readinto(self):
820 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
821 bufio = self.tp(rawio)
822 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000823 self.assertEqual(bufio.readinto(b), 2)
824 self.assertEqual(b, b"ab")
825 self.assertEqual(bufio.readinto(b), 2)
826 self.assertEqual(b, b"cd")
827 self.assertEqual(bufio.readinto(b), 2)
828 self.assertEqual(b, b"ef")
829 self.assertEqual(bufio.readinto(b), 1)
830 self.assertEqual(b, b"gf")
831 self.assertEqual(bufio.readinto(b), 0)
832 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000833
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000834 def test_readlines(self):
835 def bufio():
836 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
837 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000838 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
839 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
840 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000843 data = b"abcdefghi"
844 dlen = len(data)
845
846 tests = [
847 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
848 [ 100, [ 3, 3, 3], [ dlen ] ],
849 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
850 ]
851
852 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 rawio = self.MockFileIO(data)
854 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000855 pos = 0
856 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000858 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000860 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000863 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
865 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000866 self.assertEqual(b"abcd", bufio.read(6))
867 self.assertEqual(b"e", bufio.read(1))
868 self.assertEqual(b"fg", bufio.read())
869 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200870 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000871 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000872
Victor Stinnera80987f2011-05-25 22:47:16 +0200873 rawio = self.MockRawIO((b"a", None, None))
874 self.assertEqual(b"a", rawio.readall())
875 self.assertIsNone(rawio.readall())
876
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000877 def test_read_past_eof(self):
878 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
879 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000880
Ezio Melottib3aedd42010-11-20 19:04:17 +0000881 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 def test_read_all(self):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000886
Ezio Melottib3aedd42010-11-20 19:04:17 +0000887 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000888
Victor Stinner45df8202010-04-28 22:31:17 +0000889 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000890 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000891 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000892 try:
893 # Write out many bytes with exactly the same number of 0's,
894 # 1's... 255's. This will help us check that concurrent reading
895 # doesn't duplicate or forget contents.
896 N = 1000
897 l = list(range(256)) * N
898 random.shuffle(l)
899 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000900 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000901 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000902 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000904 errors = []
905 results = []
906 def f():
907 try:
908 # Intra-buffer read then buffer-flushing read
909 for n in cycle([1, 19]):
910 s = bufio.read(n)
911 if not s:
912 break
913 # list.append() is atomic
914 results.append(s)
915 except Exception as e:
916 errors.append(e)
917 raise
918 threads = [threading.Thread(target=f) for x in range(20)]
919 for t in threads:
920 t.start()
921 time.sleep(0.02) # yield
922 for t in threads:
923 t.join()
924 self.assertFalse(errors,
925 "the following exceptions were caught: %r" % errors)
926 s = b''.join(results)
927 for i in range(256):
928 c = bytes(bytearray([i]))
929 self.assertEqual(s.count(c), N)
930 finally:
931 support.unlink(support.TESTFN)
932
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200933 def test_unseekable(self):
934 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
935 self.assertRaises(self.UnsupportedOperation, bufio.tell)
936 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
937 bufio.read(1)
938 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
939 self.assertRaises(self.UnsupportedOperation, bufio.tell)
940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941 def test_misbehaved_io(self):
942 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
943 bufio = self.tp(rawio)
944 self.assertRaises(IOError, bufio.seek, 0)
945 self.assertRaises(IOError, bufio.tell)
946
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000947 def test_no_extraneous_read(self):
948 # Issue #9550; when the raw IO object has satisfied the read request,
949 # we should not issue any additional reads, otherwise it may block
950 # (e.g. socket).
951 bufsize = 16
952 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
953 rawio = self.MockRawIO([b"x" * n])
954 bufio = self.tp(rawio, bufsize)
955 self.assertEqual(bufio.read(n), b"x" * n)
956 # Simple case: one raw read is enough to satisfy the request.
957 self.assertEqual(rawio._extraneous_reads, 0,
958 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
959 # A more complex case where two raw reads are needed to satisfy
960 # the request.
961 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
962 bufio = self.tp(rawio, bufsize)
963 self.assertEqual(bufio.read(n), b"x" * n)
964 self.assertEqual(rawio._extraneous_reads, 0,
965 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
966
967
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000968class CBufferedReaderTest(BufferedReaderTest):
969 tp = io.BufferedReader
970
971 def test_constructor(self):
972 BufferedReaderTest.test_constructor(self)
973 # The allocation can succeed on 32-bit builds, e.g. with more
974 # than 2GB RAM and a 64-bit kernel.
975 if sys.maxsize > 0x7FFFFFFF:
976 rawio = self.MockRawIO()
977 bufio = self.tp(rawio)
978 self.assertRaises((OverflowError, MemoryError, ValueError),
979 bufio.__init__, rawio, sys.maxsize)
980
981 def test_initialization(self):
982 rawio = self.MockRawIO([b"abc"])
983 bufio = self.tp(rawio)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
985 self.assertRaises(ValueError, bufio.read)
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
987 self.assertRaises(ValueError, bufio.read)
988 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
989 self.assertRaises(ValueError, bufio.read)
990
991 def test_misbehaved_io_read(self):
992 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
993 bufio = self.tp(rawio)
994 # _pyio.BufferedReader seems to implement reading different, so that
995 # checking this is not so easy.
996 self.assertRaises(IOError, bufio.read, 10)
997
998 def test_garbage_collection(self):
999 # C BufferedReader objects are collected.
1000 # The Python version has __del__, so it ends into gc.garbage instead
1001 rawio = self.FileIO(support.TESTFN, "w+b")
1002 f = self.tp(rawio)
1003 f.f = f
1004 wr = weakref.ref(f)
1005 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001006 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001007 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008
1009class PyBufferedReaderTest(BufferedReaderTest):
1010 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001011
Guido van Rossuma9e20242007-03-08 00:43:48 +00001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1014 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_constructor(self):
1017 rawio = self.MockRawIO()
1018 bufio = self.tp(rawio)
1019 bufio.__init__(rawio)
1020 bufio.__init__(rawio, buffer_size=1024)
1021 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001022 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023 bufio.flush()
1024 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1025 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1026 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1027 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001028 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001030 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001032 def test_detach_flush(self):
1033 raw = self.MockRawIO()
1034 buf = self.tp(raw)
1035 buf.write(b"howdy!")
1036 self.assertFalse(raw._write_stack)
1037 buf.detach()
1038 self.assertEqual(raw._write_stack, [b"howdy!"])
1039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001041 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 writer = self.MockRawIO()
1043 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001044 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001045 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 def test_write_overflow(self):
1048 writer = self.MockRawIO()
1049 bufio = self.tp(writer, 8)
1050 contents = b"abcdefghijklmnop"
1051 for n in range(0, len(contents), 3):
1052 bufio.write(contents[n:n+3])
1053 flushed = b"".join(writer._write_stack)
1054 # At least (total - 8) bytes were implicitly flushed, perhaps more
1055 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001056 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def check_writes(self, intermediate_func):
1059 # Lots of writes, test the flushed output is as expected.
1060 contents = bytes(range(256)) * 1000
1061 n = 0
1062 writer = self.MockRawIO()
1063 bufio = self.tp(writer, 13)
1064 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1065 def gen_sizes():
1066 for size in count(1):
1067 for i in range(15):
1068 yield size
1069 sizes = gen_sizes()
1070 while n < len(contents):
1071 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001072 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 intermediate_func(bufio)
1074 n += size
1075 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001076 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078 def test_writes(self):
1079 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 def test_writes_and_flushes(self):
1082 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 def test_writes_and_seeks(self):
1085 def _seekabs(bufio):
1086 pos = bufio.tell()
1087 bufio.seek(pos + 1, 0)
1088 bufio.seek(pos - 1, 0)
1089 bufio.seek(pos, 0)
1090 self.check_writes(_seekabs)
1091 def _seekrel(bufio):
1092 pos = bufio.seek(0, 1)
1093 bufio.seek(+1, 1)
1094 bufio.seek(-1, 1)
1095 bufio.seek(pos, 0)
1096 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_writes_and_truncates(self):
1099 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 def test_write_non_blocking(self):
1102 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001103 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001104
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(bufio.write(b"abcd"), 4)
1106 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 # 1 byte will be written, the rest will be buffered
1108 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1112 raw.block_on(b"0")
1113 try:
1114 bufio.write(b"opqrwxyz0123456789")
1115 except self.BlockingIOError as e:
1116 written = e.characters_written
1117 else:
1118 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001119 self.assertEqual(written, 16)
1120 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001122
Ezio Melottib3aedd42010-11-20 19:04:17 +00001123 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 s = raw.pop_written()
1125 # Previously buffered bytes were flushed
1126 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001127
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 def test_write_and_rewind(self):
1129 raw = io.BytesIO()
1130 bufio = self.tp(raw, 4)
1131 self.assertEqual(bufio.write(b"abcdef"), 6)
1132 self.assertEqual(bufio.tell(), 6)
1133 bufio.seek(0, 0)
1134 self.assertEqual(bufio.write(b"XY"), 2)
1135 bufio.seek(6, 0)
1136 self.assertEqual(raw.getvalue(), b"XYcdef")
1137 self.assertEqual(bufio.write(b"123456"), 6)
1138 bufio.flush()
1139 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 def test_flush(self):
1142 writer = self.MockRawIO()
1143 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001144 bufio.write(b"abc")
1145 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 def test_destructor(self):
1149 writer = self.MockRawIO()
1150 bufio = self.tp(writer, 8)
1151 bufio.write(b"abc")
1152 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001153 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001154 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155
1156 def test_truncate(self):
1157 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001158 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 bufio = self.tp(raw, 8)
1160 bufio.write(b"abcdef")
1161 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001162 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001163 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 self.assertEqual(f.read(), b"abc")
1165
Victor Stinner45df8202010-04-28 22:31:17 +00001166 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001167 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001169 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 # Write out many bytes from many threads and test they were
1171 # all flushed.
1172 N = 1000
1173 contents = bytes(range(256)) * N
1174 sizes = cycle([1, 19])
1175 n = 0
1176 queue = deque()
1177 while n < len(contents):
1178 size = next(sizes)
1179 queue.append(contents[n:n+size])
1180 n += size
1181 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001182 # We use a real file object because it allows us to
1183 # exercise situations where the GIL is released before
1184 # writing the buffer to the raw streams. This is in addition
1185 # to concurrency issues due to switching threads in the middle
1186 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001187 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001189 errors = []
1190 def f():
1191 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192 while True:
1193 try:
1194 s = queue.popleft()
1195 except IndexError:
1196 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001197 bufio.write(s)
1198 except Exception as e:
1199 errors.append(e)
1200 raise
1201 threads = [threading.Thread(target=f) for x in range(20)]
1202 for t in threads:
1203 t.start()
1204 time.sleep(0.02) # yield
1205 for t in threads:
1206 t.join()
1207 self.assertFalse(errors,
1208 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001210 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001211 s = f.read()
1212 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001213 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001214 finally:
1215 support.unlink(support.TESTFN)
1216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001217 def test_misbehaved_io(self):
1218 rawio = self.MisbehavedRawIO()
1219 bufio = self.tp(rawio, 5)
1220 self.assertRaises(IOError, bufio.seek, 0)
1221 self.assertRaises(IOError, bufio.tell)
1222 self.assertRaises(IOError, bufio.write, b"abcdef")
1223
Benjamin Peterson59406a92009-03-26 17:10:29 +00001224 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001225 with support.check_warnings(("max_buffer_size is deprecated",
1226 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001227 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001228
1229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230class CBufferedWriterTest(BufferedWriterTest):
1231 tp = io.BufferedWriter
1232
1233 def test_constructor(self):
1234 BufferedWriterTest.test_constructor(self)
1235 # The allocation can succeed on 32-bit builds, e.g. with more
1236 # than 2GB RAM and a 64-bit kernel.
1237 if sys.maxsize > 0x7FFFFFFF:
1238 rawio = self.MockRawIO()
1239 bufio = self.tp(rawio)
1240 self.assertRaises((OverflowError, MemoryError, ValueError),
1241 bufio.__init__, rawio, sys.maxsize)
1242
1243 def test_initialization(self):
1244 rawio = self.MockRawIO()
1245 bufio = self.tp(rawio)
1246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1247 self.assertRaises(ValueError, bufio.write, b"def")
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1249 self.assertRaises(ValueError, bufio.write, b"def")
1250 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1251 self.assertRaises(ValueError, bufio.write, b"def")
1252
1253 def test_garbage_collection(self):
1254 # C BufferedWriter objects are collected, and collecting them flushes
1255 # all data to disk.
1256 # The Python version has __del__, so it ends into gc.garbage instead
1257 rawio = self.FileIO(support.TESTFN, "w+b")
1258 f = self.tp(rawio)
1259 f.write(b"123xxx")
1260 f.x = f
1261 wr = weakref.ref(f)
1262 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001263 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001264 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001265 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001266 self.assertEqual(f.read(), b"123xxx")
1267
1268
1269class PyBufferedWriterTest(BufferedWriterTest):
1270 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001271
Guido van Rossum01a27522007-03-07 01:00:12 +00001272class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001273
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001274 def test_constructor(self):
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001276 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001277
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001278 def test_detach(self):
1279 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1280 self.assertRaises(self.UnsupportedOperation, pair.detach)
1281
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001282 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001283 with support.check_warnings(("max_buffer_size is deprecated",
1284 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001285 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001286
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001287 def test_constructor_with_not_readable(self):
1288 class NotReadable(MockRawIO):
1289 def readable(self):
1290 return False
1291
1292 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1293
1294 def test_constructor_with_not_writeable(self):
1295 class NotWriteable(MockRawIO):
1296 def writable(self):
1297 return False
1298
1299 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1300
1301 def test_read(self):
1302 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1303
1304 self.assertEqual(pair.read(3), b"abc")
1305 self.assertEqual(pair.read(1), b"d")
1306 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001307 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1308 self.assertEqual(pair.read(None), b"abc")
1309
1310 def test_readlines(self):
1311 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1312 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1313 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1314 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001315
1316 def test_read1(self):
1317 # .read1() is delegated to the underlying reader object, so this test
1318 # can be shallow.
1319 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1320
1321 self.assertEqual(pair.read1(3), b"abc")
1322
1323 def test_readinto(self):
1324 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1325
1326 data = bytearray(5)
1327 self.assertEqual(pair.readinto(data), 5)
1328 self.assertEqual(data, b"abcde")
1329
1330 def test_write(self):
1331 w = self.MockRawIO()
1332 pair = self.tp(self.MockRawIO(), w)
1333
1334 pair.write(b"abc")
1335 pair.flush()
1336 pair.write(b"def")
1337 pair.flush()
1338 self.assertEqual(w._write_stack, [b"abc", b"def"])
1339
1340 def test_peek(self):
1341 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1342
1343 self.assertTrue(pair.peek(3).startswith(b"abc"))
1344 self.assertEqual(pair.read(3), b"abc")
1345
1346 def test_readable(self):
1347 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1348 self.assertTrue(pair.readable())
1349
1350 def test_writeable(self):
1351 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1352 self.assertTrue(pair.writable())
1353
1354 def test_seekable(self):
1355 # BufferedRWPairs are never seekable, even if their readers and writers
1356 # are.
1357 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1358 self.assertFalse(pair.seekable())
1359
1360 # .flush() is delegated to the underlying writer object and has been
1361 # tested in the test_write method.
1362
1363 def test_close_and_closed(self):
1364 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1365 self.assertFalse(pair.closed)
1366 pair.close()
1367 self.assertTrue(pair.closed)
1368
1369 def test_isatty(self):
1370 class SelectableIsAtty(MockRawIO):
1371 def __init__(self, isatty):
1372 MockRawIO.__init__(self)
1373 self._isatty = isatty
1374
1375 def isatty(self):
1376 return self._isatty
1377
1378 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1379 self.assertFalse(pair.isatty())
1380
1381 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1382 self.assertTrue(pair.isatty())
1383
1384 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1385 self.assertTrue(pair.isatty())
1386
1387 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1388 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001389
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001390class CBufferedRWPairTest(BufferedRWPairTest):
1391 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001392
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001393class PyBufferedRWPairTest(BufferedRWPairTest):
1394 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001395
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396
1397class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1398 read_mode = "rb+"
1399 write_mode = "wb+"
1400
1401 def test_constructor(self):
1402 BufferedReaderTest.test_constructor(self)
1403 BufferedWriterTest.test_constructor(self)
1404
1405 def test_read_and_write(self):
1406 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001407 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001408
1409 self.assertEqual(b"as", rw.read(2))
1410 rw.write(b"ddd")
1411 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001412 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001415
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 def test_seek_and_tell(self):
1417 raw = self.BytesIO(b"asdfghjkl")
1418 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001419
Ezio Melottib3aedd42010-11-20 19:04:17 +00001420 self.assertEqual(b"as", rw.read(2))
1421 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001422 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001424
Antoine Pitroue05565e2011-08-20 14:39:23 +02001425 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001426 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001427 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001429 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001431 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(7, rw.tell())
1433 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001434 rw.flush()
1435 self.assertEqual(b"asdf123fl", raw.getvalue())
1436
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001437 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 def check_flush_and_read(self, read_func):
1440 raw = self.BytesIO(b"abcdefghi")
1441 bufio = self.tp(raw)
1442
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(b"ef", read_func(bufio, 2))
1446 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(6, bufio.tell())
1449 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001450 raw.seek(0, 0)
1451 raw.write(b"XYZ")
1452 # flush() resets the read buffer
1453 bufio.flush()
1454 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456
1457 def test_flush_and_read(self):
1458 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1459
1460 def test_flush_and_readinto(self):
1461 def _readinto(bufio, n=-1):
1462 b = bytearray(n if n >= 0 else 9999)
1463 n = bufio.readinto(b)
1464 return bytes(b[:n])
1465 self.check_flush_and_read(_readinto)
1466
1467 def test_flush_and_peek(self):
1468 def _peek(bufio, n=-1):
1469 # This relies on the fact that the buffer can contain the whole
1470 # raw stream, otherwise peek() can return less.
1471 b = bufio.peek(n)
1472 if n != -1:
1473 b = b[:n]
1474 bufio.seek(len(b), 1)
1475 return b
1476 self.check_flush_and_read(_peek)
1477
1478 def test_flush_and_write(self):
1479 raw = self.BytesIO(b"abcdefghi")
1480 bufio = self.tp(raw)
1481
1482 bufio.write(b"123")
1483 bufio.flush()
1484 bufio.write(b"45")
1485 bufio.flush()
1486 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(b"12345fghi", raw.getvalue())
1488 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489
1490 def test_threads(self):
1491 BufferedReaderTest.test_threads(self)
1492 BufferedWriterTest.test_threads(self)
1493
1494 def test_writes_and_peek(self):
1495 def _peek(bufio):
1496 bufio.peek(1)
1497 self.check_writes(_peek)
1498 def _peek(bufio):
1499 pos = bufio.tell()
1500 bufio.seek(-1, 1)
1501 bufio.peek(1)
1502 bufio.seek(pos, 0)
1503 self.check_writes(_peek)
1504
1505 def test_writes_and_reads(self):
1506 def _read(bufio):
1507 bufio.seek(-1, 1)
1508 bufio.read(1)
1509 self.check_writes(_read)
1510
1511 def test_writes_and_read1s(self):
1512 def _read1(bufio):
1513 bufio.seek(-1, 1)
1514 bufio.read1(1)
1515 self.check_writes(_read1)
1516
1517 def test_writes_and_readintos(self):
1518 def _read(bufio):
1519 bufio.seek(-1, 1)
1520 bufio.readinto(bytearray(1))
1521 self.check_writes(_read)
1522
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001523 def test_write_after_readahead(self):
1524 # Issue #6629: writing after the buffer was filled by readahead should
1525 # first rewind the raw stream.
1526 for overwrite_size in [1, 5]:
1527 raw = self.BytesIO(b"A" * 10)
1528 bufio = self.tp(raw, 4)
1529 # Trigger readahead
1530 self.assertEqual(bufio.read(1), b"A")
1531 self.assertEqual(bufio.tell(), 1)
1532 # Overwriting should rewind the raw stream if it needs so
1533 bufio.write(b"B" * overwrite_size)
1534 self.assertEqual(bufio.tell(), overwrite_size + 1)
1535 # If the write size was smaller than the buffer size, flush() and
1536 # check that rewind happens.
1537 bufio.flush()
1538 self.assertEqual(bufio.tell(), overwrite_size + 1)
1539 s = raw.getvalue()
1540 self.assertEqual(s,
1541 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1542
Antoine Pitrou7c404892011-05-13 00:13:33 +02001543 def test_write_rewind_write(self):
1544 # Various combinations of reading / writing / seeking backwards / writing again
1545 def mutate(bufio, pos1, pos2):
1546 assert pos2 >= pos1
1547 # Fill the buffer
1548 bufio.seek(pos1)
1549 bufio.read(pos2 - pos1)
1550 bufio.write(b'\x02')
1551 # This writes earlier than the previous write, but still inside
1552 # the buffer.
1553 bufio.seek(pos1)
1554 bufio.write(b'\x01')
1555
1556 b = b"\x80\x81\x82\x83\x84"
1557 for i in range(0, len(b)):
1558 for j in range(i, len(b)):
1559 raw = self.BytesIO(b)
1560 bufio = self.tp(raw, 100)
1561 mutate(bufio, i, j)
1562 bufio.flush()
1563 expected = bytearray(b)
1564 expected[j] = 2
1565 expected[i] = 1
1566 self.assertEqual(raw.getvalue(), expected,
1567 "failed result for i=%d, j=%d" % (i, j))
1568
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001569 def test_truncate_after_read_or_write(self):
1570 raw = self.BytesIO(b"A" * 10)
1571 bufio = self.tp(raw, 100)
1572 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1573 self.assertEqual(bufio.truncate(), 2)
1574 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1575 self.assertEqual(bufio.truncate(), 4)
1576
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577 def test_misbehaved_io(self):
1578 BufferedReaderTest.test_misbehaved_io(self)
1579 BufferedWriterTest.test_misbehaved_io(self)
1580
Antoine Pitroue05565e2011-08-20 14:39:23 +02001581 def test_interleaved_read_write(self):
1582 # Test for issue #12213
1583 with self.BytesIO(b'abcdefgh') as raw:
1584 with self.tp(raw, 100) as f:
1585 f.write(b"1")
1586 self.assertEqual(f.read(1), b'b')
1587 f.write(b'2')
1588 self.assertEqual(f.read1(1), b'd')
1589 f.write(b'3')
1590 buf = bytearray(1)
1591 f.readinto(buf)
1592 self.assertEqual(buf, b'f')
1593 f.write(b'4')
1594 self.assertEqual(f.peek(1), b'h')
1595 f.flush()
1596 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1597
1598 with self.BytesIO(b'abc') as raw:
1599 with self.tp(raw, 100) as f:
1600 self.assertEqual(f.read(1), b'a')
1601 f.write(b"2")
1602 self.assertEqual(f.read(1), b'c')
1603 f.flush()
1604 self.assertEqual(raw.getvalue(), b'a2c')
1605
1606 def test_interleaved_readline_write(self):
1607 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1608 with self.tp(raw) as f:
1609 f.write(b'1')
1610 self.assertEqual(f.readline(), b'b\n')
1611 f.write(b'2')
1612 self.assertEqual(f.readline(), b'def\n')
1613 f.write(b'3')
1614 self.assertEqual(f.readline(), b'\n')
1615 f.flush()
1616 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1617
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001618 # You can't construct a BufferedRandom over a non-seekable stream.
1619 test_unseekable = None
1620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621class CBufferedRandomTest(BufferedRandomTest):
1622 tp = io.BufferedRandom
1623
1624 def test_constructor(self):
1625 BufferedRandomTest.test_constructor(self)
1626 # The allocation can succeed on 32-bit builds, e.g. with more
1627 # than 2GB RAM and a 64-bit kernel.
1628 if sys.maxsize > 0x7FFFFFFF:
1629 rawio = self.MockRawIO()
1630 bufio = self.tp(rawio)
1631 self.assertRaises((OverflowError, MemoryError, ValueError),
1632 bufio.__init__, rawio, sys.maxsize)
1633
1634 def test_garbage_collection(self):
1635 CBufferedReaderTest.test_garbage_collection(self)
1636 CBufferedWriterTest.test_garbage_collection(self)
1637
1638class PyBufferedRandomTest(BufferedRandomTest):
1639 tp = pyio.BufferedRandom
1640
1641
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001642# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1643# properties:
1644# - A single output character can correspond to many bytes of input.
1645# - The number of input bytes to complete the character can be
1646# undetermined until the last input byte is received.
1647# - The number of input bytes can vary depending on previous input.
1648# - A single input byte can correspond to many characters of output.
1649# - The number of output characters can be undetermined until the
1650# last input byte is received.
1651# - The number of output characters can vary depending on previous input.
1652
1653class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1654 """
1655 For testing seek/tell behavior with a stateful, buffering decoder.
1656
1657 Input is a sequence of words. Words may be fixed-length (length set
1658 by input) or variable-length (period-terminated). In variable-length
1659 mode, extra periods are ignored. Possible words are:
1660 - 'i' followed by a number sets the input length, I (maximum 99).
1661 When I is set to 0, words are space-terminated.
1662 - 'o' followed by a number sets the output length, O (maximum 99).
1663 - Any other word is converted into a word followed by a period on
1664 the output. The output word consists of the input word truncated
1665 or padded out with hyphens to make its length equal to O. If O
1666 is 0, the word is output verbatim without truncating or padding.
1667 I and O are initially set to 1. When I changes, any buffered input is
1668 re-scanned according to the new I. EOF also terminates the last word.
1669 """
1670
1671 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001672 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001673 self.reset()
1674
1675 def __repr__(self):
1676 return '<SID %x>' % id(self)
1677
1678 def reset(self):
1679 self.i = 1
1680 self.o = 1
1681 self.buffer = bytearray()
1682
1683 def getstate(self):
1684 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1685 return bytes(self.buffer), i*100 + o
1686
1687 def setstate(self, state):
1688 buffer, io = state
1689 self.buffer = bytearray(buffer)
1690 i, o = divmod(io, 100)
1691 self.i, self.o = i ^ 1, o ^ 1
1692
1693 def decode(self, input, final=False):
1694 output = ''
1695 for b in input:
1696 if self.i == 0: # variable-length, terminated with period
1697 if b == ord('.'):
1698 if self.buffer:
1699 output += self.process_word()
1700 else:
1701 self.buffer.append(b)
1702 else: # fixed-length, terminate after self.i bytes
1703 self.buffer.append(b)
1704 if len(self.buffer) == self.i:
1705 output += self.process_word()
1706 if final and self.buffer: # EOF terminates the last word
1707 output += self.process_word()
1708 return output
1709
1710 def process_word(self):
1711 output = ''
1712 if self.buffer[0] == ord('i'):
1713 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1714 elif self.buffer[0] == ord('o'):
1715 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1716 else:
1717 output = self.buffer.decode('ascii')
1718 if len(output) < self.o:
1719 output += '-'*self.o # pad out with hyphens
1720 if self.o:
1721 output = output[:self.o] # truncate to output length
1722 output += '.'
1723 self.buffer = bytearray()
1724 return output
1725
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001726 codecEnabled = False
1727
1728 @classmethod
1729 def lookupTestDecoder(cls, name):
1730 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001731 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001732 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001733 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001734 incrementalencoder=None,
1735 streamreader=None, streamwriter=None,
1736 incrementaldecoder=cls)
1737
1738# Register the previous decoder for testing.
1739# Disabled by default, tests will enable it.
1740codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1741
1742
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001743class StatefulIncrementalDecoderTest(unittest.TestCase):
1744 """
1745 Make sure the StatefulIncrementalDecoder actually works.
1746 """
1747
1748 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001749 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001750 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001751 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001752 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001753 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001754 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001755 # I=0, O=6 (variable-length input, fixed-length output)
1756 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1757 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001758 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001759 # I=6, O=3 (fixed-length input > fixed-length output)
1760 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1761 # I=0, then 3; O=29, then 15 (with longer output)
1762 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1763 'a----------------------------.' +
1764 'b----------------------------.' +
1765 'cde--------------------------.' +
1766 'abcdefghijabcde.' +
1767 'a.b------------.' +
1768 '.c.------------.' +
1769 'd.e------------.' +
1770 'k--------------.' +
1771 'l--------------.' +
1772 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001773 ]
1774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001776 # Try a few one-shot test cases.
1777 for input, eof, output in self.test_cases:
1778 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001779 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001780
1781 # Also test an unfinished decode, followed by forcing EOF.
1782 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001783 self.assertEqual(d.decode(b'oiabcd'), '')
1784 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001785
1786class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001787
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001788 def setUp(self):
1789 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1790 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001791 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001792
Guido van Rossumd0712812007-04-11 16:32:43 +00001793 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001794 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001795
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 def test_constructor(self):
1797 r = self.BytesIO(b"\xc3\xa9\n\n")
1798 b = self.BufferedReader(r, 1000)
1799 t = self.TextIOWrapper(b)
1800 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001801 self.assertEqual(t.encoding, "latin1")
1802 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001804 self.assertEqual(t.encoding, "utf8")
1805 self.assertEqual(t.line_buffering, True)
1806 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 self.assertRaises(TypeError, t.__init__, b, newline=42)
1808 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1809
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001810 def test_detach(self):
1811 r = self.BytesIO()
1812 b = self.BufferedWriter(r)
1813 t = self.TextIOWrapper(b)
1814 self.assertIs(t.detach(), b)
1815
1816 t = self.TextIOWrapper(b, encoding="ascii")
1817 t.write("howdy")
1818 self.assertFalse(r.getvalue())
1819 t.detach()
1820 self.assertEqual(r.getvalue(), b"howdy")
1821 self.assertRaises(ValueError, t.detach)
1822
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001823 def test_repr(self):
1824 raw = self.BytesIO("hello".encode("utf-8"))
1825 b = self.BufferedReader(raw)
1826 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001827 modname = self.TextIOWrapper.__module__
1828 self.assertEqual(repr(t),
1829 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1830 raw.name = "dummy"
1831 self.assertEqual(repr(t),
1832 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001833 t.mode = "r"
1834 self.assertEqual(repr(t),
1835 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001836 raw.name = b"dummy"
1837 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001838 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001840 def test_line_buffering(self):
1841 r = self.BytesIO()
1842 b = self.BufferedWriter(r, 1000)
1843 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001844 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001845 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001846 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001847 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001848 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001849 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 def test_encoding(self):
1852 # Check the encoding attribute is always set, and valid
1853 b = self.BytesIO()
1854 t = self.TextIOWrapper(b, encoding="utf8")
1855 self.assertEqual(t.encoding, "utf8")
1856 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001857 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 codecs.lookup(t.encoding)
1859
1860 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001861 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 b = self.BytesIO(b"abc\n\xff\n")
1863 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001864 self.assertRaises(UnicodeError, t.read)
1865 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 b = self.BytesIO(b"abc\n\xff\n")
1867 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001868 self.assertRaises(UnicodeError, t.read)
1869 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 b = self.BytesIO(b"abc\n\xff\n")
1871 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001872 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001873 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 b = self.BytesIO(b"abc\n\xff\n")
1875 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001876 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001877
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001879 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 b = self.BytesIO()
1881 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001882 self.assertRaises(UnicodeError, t.write, "\xff")
1883 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 b = self.BytesIO()
1885 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001886 self.assertRaises(UnicodeError, t.write, "\xff")
1887 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888 b = self.BytesIO()
1889 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001890 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001891 t.write("abc\xffdef\n")
1892 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001893 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001894 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 b = self.BytesIO()
1896 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001897 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001898 t.write("abc\xffdef\n")
1899 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001900 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001903 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1904
1905 tests = [
1906 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001907 [ '', input_lines ],
1908 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1909 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1910 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001911 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001912 encodings = (
1913 'utf-8', 'latin-1',
1914 'utf-16', 'utf-16-le', 'utf-16-be',
1915 'utf-32', 'utf-32-le', 'utf-32-be',
1916 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001917
Guido van Rossum8358db22007-08-18 21:39:55 +00001918 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001919 # character in TextIOWrapper._pending_line.
1920 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001921 # XXX: str.encode() should return bytes
1922 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001923 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001924 for bufsize in range(1, 10):
1925 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1927 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001928 encoding=encoding)
1929 if do_reads:
1930 got_lines = []
1931 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001932 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001933 if c2 == '':
1934 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001935 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001936 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001937 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001938 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001939
1940 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001941 self.assertEqual(got_line, exp_line)
1942 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 def test_newlines_input(self):
1945 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001946 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1947 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001948 (None, normalized.decode("ascii").splitlines(True)),
1949 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1951 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1952 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001953 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954 buf = self.BytesIO(testdata)
1955 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001956 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001957 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001958 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 def test_newlines_output(self):
1961 testdict = {
1962 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1963 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1964 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1965 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1966 }
1967 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1968 for newline, expected in tests:
1969 buf = self.BytesIO()
1970 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1971 txt.write("AAA\nB")
1972 txt.write("BB\nCCC\n")
1973 txt.write("X\rY\r\nZ")
1974 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001975 self.assertEqual(buf.closed, False)
1976 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977
1978 def test_destructor(self):
1979 l = []
1980 base = self.BytesIO
1981 class MyBytesIO(base):
1982 def close(self):
1983 l.append(self.getvalue())
1984 base.close(self)
1985 b = MyBytesIO()
1986 t = self.TextIOWrapper(b, encoding="ascii")
1987 t.write("abc")
1988 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001989 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001991
1992 def test_override_destructor(self):
1993 record = []
1994 class MyTextIO(self.TextIOWrapper):
1995 def __del__(self):
1996 record.append(1)
1997 try:
1998 f = super().__del__
1999 except AttributeError:
2000 pass
2001 else:
2002 f()
2003 def close(self):
2004 record.append(2)
2005 super().close()
2006 def flush(self):
2007 record.append(3)
2008 super().flush()
2009 b = self.BytesIO()
2010 t = MyTextIO(b, encoding="ascii")
2011 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002012 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 self.assertEqual(record, [1, 2, 3])
2014
2015 def test_error_through_destructor(self):
2016 # Test that the exception state is not modified by a destructor,
2017 # even if close() fails.
2018 rawio = self.CloseFailureIO()
2019 def f():
2020 self.TextIOWrapper(rawio).xyzzy
2021 with support.captured_output("stderr") as s:
2022 self.assertRaises(AttributeError, f)
2023 s = s.getvalue().strip()
2024 if s:
2025 # The destructor *may* have printed an unraisable error, check it
2026 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002027 self.assertTrue(s.startswith("Exception IOError: "), s)
2028 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002029
Guido van Rossum9b76da62007-04-11 01:09:03 +00002030 # Systematic tests of the text I/O API
2031
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002033 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2034 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002036 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002037 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002038 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002040 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002041 self.assertEqual(f.tell(), 0)
2042 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002043 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002044 self.assertEqual(f.seek(0), 0)
2045 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002046 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002047 self.assertEqual(f.read(2), "ab")
2048 self.assertEqual(f.read(1), "c")
2049 self.assertEqual(f.read(1), "")
2050 self.assertEqual(f.read(), "")
2051 self.assertEqual(f.tell(), cookie)
2052 self.assertEqual(f.seek(0), 0)
2053 self.assertEqual(f.seek(0, 2), cookie)
2054 self.assertEqual(f.write("def"), 3)
2055 self.assertEqual(f.seek(cookie), cookie)
2056 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002057 if enc.startswith("utf"):
2058 self.multi_line_test(f, enc)
2059 f.close()
2060
2061 def multi_line_test(self, f, enc):
2062 f.seek(0)
2063 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002064 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002065 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002066 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002067 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002068 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002069 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002070 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002071 wlines.append((f.tell(), line))
2072 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002073 f.seek(0)
2074 rlines = []
2075 while True:
2076 pos = f.tell()
2077 line = f.readline()
2078 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002079 break
2080 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 def test_telling(self):
2084 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002085 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002086 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002087 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002088 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002089 p2 = f.tell()
2090 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002091 self.assertEqual(f.tell(), p0)
2092 self.assertEqual(f.readline(), "\xff\n")
2093 self.assertEqual(f.tell(), p1)
2094 self.assertEqual(f.readline(), "\xff\n")
2095 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002096 f.seek(0)
2097 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002099 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002100 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002101 f.close()
2102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 def test_seeking(self):
2104 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002105 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002106 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002107 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002109 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002110 suffix = bytes(u_suffix.encode("utf-8"))
2111 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002112 with self.open(support.TESTFN, "wb") as f:
2113 f.write(line*2)
2114 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2115 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002116 self.assertEqual(s, str(prefix, "ascii"))
2117 self.assertEqual(f.tell(), prefix_size)
2118 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002121 # Regression test for a specific bug
2122 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002123 with self.open(support.TESTFN, "wb") as f:
2124 f.write(data)
2125 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2126 f._CHUNK_SIZE # Just test that it exists
2127 f._CHUNK_SIZE = 2
2128 f.readline()
2129 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 def test_seek_and_tell(self):
2132 #Test seek/tell using the StatefulIncrementalDecoder.
2133 # Make test faster by doing smaller seeks
2134 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002135
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002136 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002137 """Tell/seek to various points within a data stream and ensure
2138 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002140 f.write(data)
2141 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 f = self.open(support.TESTFN, encoding='test_decoder')
2143 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002144 decoded = f.read()
2145 f.close()
2146
Neal Norwitze2b07052008-03-18 19:52:05 +00002147 for i in range(min_pos, len(decoded) + 1): # seek positions
2148 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002150 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002153 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002154 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002155 f.close()
2156
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002157 # Enable the test decoder.
2158 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002159
2160 # Run the tests.
2161 try:
2162 # Try each test case.
2163 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002164 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002165
2166 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002167 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2168 offset = CHUNK_SIZE - len(input)//2
2169 prefix = b'.'*offset
2170 # Don't bother seeking into the prefix (takes too long).
2171 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002172 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002173
2174 # Ensure our test decoder won't interfere with subsequent tests.
2175 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002176 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002179 data = "1234567890"
2180 tests = ("utf-16",
2181 "utf-16-le",
2182 "utf-16-be",
2183 "utf-32",
2184 "utf-32-le",
2185 "utf-32-be")
2186 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 buf = self.BytesIO()
2188 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002189 # Check if the BOM is written only once (see issue1753).
2190 f.write(data)
2191 f.write(data)
2192 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002194 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(f.read(), data * 2)
2196 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002197
Benjamin Petersona1b49012009-03-31 23:11:32 +00002198 def test_unreadable(self):
2199 class UnReadable(self.BytesIO):
2200 def readable(self):
2201 return False
2202 txt = self.TextIOWrapper(UnReadable())
2203 self.assertRaises(IOError, txt.read)
2204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 def test_read_one_by_one(self):
2206 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002207 reads = ""
2208 while True:
2209 c = txt.read(1)
2210 if not c:
2211 break
2212 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002214
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002215 def test_readlines(self):
2216 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2217 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2218 txt.seek(0)
2219 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2220 txt.seek(0)
2221 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2222
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002223 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002225 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002227 reads = ""
2228 while True:
2229 c = txt.read(128)
2230 if not c:
2231 break
2232 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002234
2235 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002237
2238 # read one char at a time
2239 reads = ""
2240 while True:
2241 c = txt.read(1)
2242 if not c:
2243 break
2244 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002246
2247 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002248 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002249 txt._CHUNK_SIZE = 4
2250
2251 reads = ""
2252 while True:
2253 c = txt.read(4)
2254 if not c:
2255 break
2256 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002258
2259 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002261 txt._CHUNK_SIZE = 4
2262
2263 reads = txt.read(4)
2264 reads += txt.read(4)
2265 reads += txt.readline()
2266 reads += txt.readline()
2267 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269
2270 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002272 txt._CHUNK_SIZE = 4
2273
2274 reads = txt.read(4)
2275 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002276 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002277
2278 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002280 txt._CHUNK_SIZE = 4
2281
2282 reads = txt.read(4)
2283 pos = txt.tell()
2284 txt.seek(0)
2285 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002288 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002289 buffer = self.BytesIO(self.testdata)
2290 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002291
2292 self.assertEqual(buffer.seekable(), txt.seekable())
2293
Antoine Pitroue4501852009-05-14 18:55:55 +00002294 def test_append_bom(self):
2295 # The BOM is not written again when appending to a non-empty file
2296 filename = support.TESTFN
2297 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2298 with self.open(filename, 'w', encoding=charset) as f:
2299 f.write('aaa')
2300 pos = f.tell()
2301 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002302 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002303
2304 with self.open(filename, 'a', encoding=charset) as f:
2305 f.write('xxx')
2306 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002307 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002308
2309 def test_seek_bom(self):
2310 # Same test, but when seeking manually
2311 filename = support.TESTFN
2312 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2313 with self.open(filename, 'w', encoding=charset) as f:
2314 f.write('aaa')
2315 pos = f.tell()
2316 with self.open(filename, 'r+', encoding=charset) as f:
2317 f.seek(pos)
2318 f.write('zzz')
2319 f.seek(0)
2320 f.write('bbb')
2321 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002322 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002323
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002324 def test_errors_property(self):
2325 with self.open(support.TESTFN, "w") as f:
2326 self.assertEqual(f.errors, "strict")
2327 with self.open(support.TESTFN, "w", errors="replace") as f:
2328 self.assertEqual(f.errors, "replace")
2329
Victor Stinner45df8202010-04-28 22:31:17 +00002330 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002331 def test_threads_write(self):
2332 # Issue6750: concurrent writes could duplicate data
2333 event = threading.Event()
2334 with self.open(support.TESTFN, "w", buffering=1) as f:
2335 def run(n):
2336 text = "Thread%03d\n" % n
2337 event.wait()
2338 f.write(text)
2339 threads = [threading.Thread(target=lambda n=x: run(n))
2340 for x in range(20)]
2341 for t in threads:
2342 t.start()
2343 time.sleep(0.02)
2344 event.set()
2345 for t in threads:
2346 t.join()
2347 with self.open(support.TESTFN) as f:
2348 content = f.read()
2349 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002351
Antoine Pitrou6be88762010-05-03 16:48:20 +00002352 def test_flush_error_on_close(self):
2353 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2354 def bad_flush():
2355 raise IOError()
2356 txt.flush = bad_flush
2357 self.assertRaises(IOError, txt.close) # exception not swallowed
2358
2359 def test_multi_close(self):
2360 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2361 txt.close()
2362 txt.close()
2363 txt.close()
2364 self.assertRaises(ValueError, txt.flush)
2365
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002366 def test_unseekable(self):
2367 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2368 self.assertRaises(self.UnsupportedOperation, txt.tell)
2369 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2370
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002371 def test_readonly_attributes(self):
2372 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2373 buf = self.BytesIO(self.testdata)
2374 with self.assertRaises(AttributeError):
2375 txt.buffer = buf
2376
Antoine Pitroue96ec682011-07-23 21:46:35 +02002377 def test_rawio(self):
2378 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2379 # that subprocess.Popen() can have the required unbuffered
2380 # semantics with universal_newlines=True.
2381 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2382 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2383 # Reads
2384 self.assertEqual(txt.read(4), 'abcd')
2385 self.assertEqual(txt.readline(), 'efghi\n')
2386 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2387
2388 def test_rawio_write_through(self):
2389 # Issue #12591: with write_through=True, writes don't need a flush
2390 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2391 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2392 write_through=True)
2393 txt.write('1')
2394 txt.write('23\n4')
2395 txt.write('5')
2396 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398class CTextIOWrapperTest(TextIOWrapperTest):
2399
2400 def test_initialization(self):
2401 r = self.BytesIO(b"\xc3\xa9\n\n")
2402 b = self.BufferedReader(r, 1000)
2403 t = self.TextIOWrapper(b)
2404 self.assertRaises(TypeError, t.__init__, b, newline=42)
2405 self.assertRaises(ValueError, t.read)
2406 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2407 self.assertRaises(ValueError, t.read)
2408
2409 def test_garbage_collection(self):
2410 # C TextIOWrapper objects are collected, and collecting them flushes
2411 # all data to disk.
2412 # The Python version has __del__, so it ends in gc.garbage instead.
2413 rawio = io.FileIO(support.TESTFN, "wb")
2414 b = self.BufferedWriter(rawio)
2415 t = self.TextIOWrapper(b, encoding="ascii")
2416 t.write("456def")
2417 t.x = t
2418 wr = weakref.ref(t)
2419 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002420 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002421 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002422 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 self.assertEqual(f.read(), b"456def")
2424
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002425 def test_rwpair_cleared_before_textio(self):
2426 # Issue 13070: TextIOWrapper's finalization would crash when called
2427 # after the reference to the underlying BufferedRWPair's writer got
2428 # cleared by the GC.
2429 for i in range(1000):
2430 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2431 t1 = self.TextIOWrapper(b1, encoding="ascii")
2432 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2433 t2 = self.TextIOWrapper(b2, encoding="ascii")
2434 # circular references
2435 t1.buddy = t2
2436 t2.buddy = t1
2437 support.gc_collect()
2438
2439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440class PyTextIOWrapperTest(TextIOWrapperTest):
2441 pass
2442
2443
2444class IncrementalNewlineDecoderTest(unittest.TestCase):
2445
2446 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002447 # UTF-8 specific tests for a newline decoder
2448 def _check_decode(b, s, **kwargs):
2449 # We exercise getstate() / setstate() as well as decode()
2450 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002451 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002452 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002453 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002454
Antoine Pitrou180a3362008-12-14 16:36:46 +00002455 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002456
Antoine Pitrou180a3362008-12-14 16:36:46 +00002457 _check_decode(b'\xe8', "")
2458 _check_decode(b'\xa2', "")
2459 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002460
Antoine Pitrou180a3362008-12-14 16:36:46 +00002461 _check_decode(b'\xe8', "")
2462 _check_decode(b'\xa2', "")
2463 _check_decode(b'\x88', "\u8888")
2464
2465 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002466 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2467
Antoine Pitrou180a3362008-12-14 16:36:46 +00002468 decoder.reset()
2469 _check_decode(b'\n', "\n")
2470 _check_decode(b'\r', "")
2471 _check_decode(b'', "\n", final=True)
2472 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002473
Antoine Pitrou180a3362008-12-14 16:36:46 +00002474 _check_decode(b'\r', "")
2475 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002476
Antoine Pitrou180a3362008-12-14 16:36:46 +00002477 _check_decode(b'\r\r\n', "\n\n")
2478 _check_decode(b'\r', "")
2479 _check_decode(b'\r', "\n")
2480 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002481
Antoine Pitrou180a3362008-12-14 16:36:46 +00002482 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2483 _check_decode(b'\xe8\xa2\x88', "\u8888")
2484 _check_decode(b'\n', "\n")
2485 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2486 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002488 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002490 if encoding is not None:
2491 encoder = codecs.getincrementalencoder(encoding)()
2492 def _decode_bytewise(s):
2493 # Decode one byte at a time
2494 for b in encoder.encode(s):
2495 result.append(decoder.decode(bytes([b])))
2496 else:
2497 encoder = None
2498 def _decode_bytewise(s):
2499 # Decode one char at a time
2500 for c in s:
2501 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002503 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002505 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002506 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002507 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002508 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002509 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002511 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002512 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002513 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002514 input = "abc"
2515 if encoder is not None:
2516 encoder.reset()
2517 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(decoder.decode(input), "abc")
2519 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002520
2521 def test_newline_decoder(self):
2522 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002523 # None meaning the IncrementalNewlineDecoder takes unicode input
2524 # rather than bytes input
2525 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002526 'utf-16', 'utf-16-le', 'utf-16-be',
2527 'utf-32', 'utf-32-le', 'utf-32-be',
2528 )
2529 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 decoder = enc and codecs.getincrementaldecoder(enc)()
2531 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2532 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002533 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002534 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2535 self.check_newline_decoding_utf8(decoder)
2536
Antoine Pitrou66913e22009-03-06 23:40:56 +00002537 def test_newline_bytes(self):
2538 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2539 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(dec.newlines, None)
2541 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2542 self.assertEqual(dec.newlines, None)
2543 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2544 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002545 dec = self.IncrementalNewlineDecoder(None, translate=False)
2546 _check(dec)
2547 dec = self.IncrementalNewlineDecoder(None, translate=True)
2548 _check(dec)
2549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2551 pass
2552
2553class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2554 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002555
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002556
Guido van Rossum01a27522007-03-07 01:00:12 +00002557# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002558
Guido van Rossum5abbf752007-08-27 17:39:33 +00002559class MiscIOTest(unittest.TestCase):
2560
Barry Warsaw40e82462008-11-20 20:14:50 +00002561 def tearDown(self):
2562 support.unlink(support.TESTFN)
2563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 def test___all__(self):
2565 for name in self.io.__all__:
2566 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002567 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002568 if name == "open":
2569 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002570 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002571 self.assertTrue(issubclass(obj, Exception), name)
2572 elif not name.startswith("SEEK_"):
2573 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002574
Barry Warsaw40e82462008-11-20 20:14:50 +00002575 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002576 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002577 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002578 f.close()
2579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002581 self.assertEqual(f.name, support.TESTFN)
2582 self.assertEqual(f.buffer.name, support.TESTFN)
2583 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2584 self.assertEqual(f.mode, "U")
2585 self.assertEqual(f.buffer.mode, "rb")
2586 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002587 f.close()
2588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002590 self.assertEqual(f.mode, "w+")
2591 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2592 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002593
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(g.mode, "wb")
2596 self.assertEqual(g.raw.mode, "wb")
2597 self.assertEqual(g.name, f.fileno())
2598 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002599 f.close()
2600 g.close()
2601
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002602 def test_io_after_close(self):
2603 for kwargs in [
2604 {"mode": "w"},
2605 {"mode": "wb"},
2606 {"mode": "w", "buffering": 1},
2607 {"mode": "w", "buffering": 2},
2608 {"mode": "wb", "buffering": 0},
2609 {"mode": "r"},
2610 {"mode": "rb"},
2611 {"mode": "r", "buffering": 1},
2612 {"mode": "r", "buffering": 2},
2613 {"mode": "rb", "buffering": 0},
2614 {"mode": "w+"},
2615 {"mode": "w+b"},
2616 {"mode": "w+", "buffering": 1},
2617 {"mode": "w+", "buffering": 2},
2618 {"mode": "w+b", "buffering": 0},
2619 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002620 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002621 f.close()
2622 self.assertRaises(ValueError, f.flush)
2623 self.assertRaises(ValueError, f.fileno)
2624 self.assertRaises(ValueError, f.isatty)
2625 self.assertRaises(ValueError, f.__iter__)
2626 if hasattr(f, "peek"):
2627 self.assertRaises(ValueError, f.peek, 1)
2628 self.assertRaises(ValueError, f.read)
2629 if hasattr(f, "read1"):
2630 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002631 if hasattr(f, "readall"):
2632 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002633 if hasattr(f, "readinto"):
2634 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2635 self.assertRaises(ValueError, f.readline)
2636 self.assertRaises(ValueError, f.readlines)
2637 self.assertRaises(ValueError, f.seek, 0)
2638 self.assertRaises(ValueError, f.tell)
2639 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 self.assertRaises(ValueError, f.write,
2641 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002642 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002643 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002644
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 def test_blockingioerror(self):
2646 # Various BlockingIOError issues
2647 self.assertRaises(TypeError, self.BlockingIOError)
2648 self.assertRaises(TypeError, self.BlockingIOError, 1)
2649 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2650 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2651 b = self.BlockingIOError(1, "")
2652 self.assertEqual(b.characters_written, 0)
2653 class C(str):
2654 pass
2655 c = C("")
2656 b = self.BlockingIOError(1, c)
2657 c.b = b
2658 b.c = c
2659 wr = weakref.ref(c)
2660 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002661 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002662 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663
2664 def test_abcs(self):
2665 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002666 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2667 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2668 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2669 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670
2671 def _check_abc_inheritance(self, abcmodule):
2672 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002673 self.assertIsInstance(f, abcmodule.IOBase)
2674 self.assertIsInstance(f, abcmodule.RawIOBase)
2675 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2676 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002678 self.assertIsInstance(f, abcmodule.IOBase)
2679 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2680 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2681 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002682 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002683 self.assertIsInstance(f, abcmodule.IOBase)
2684 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2685 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2686 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002687
2688 def test_abc_inheritance(self):
2689 # Test implementations inherit from their respective ABCs
2690 self._check_abc_inheritance(self)
2691
2692 def test_abc_inheritance_official(self):
2693 # Test implementations inherit from the official ABCs of the
2694 # baseline "io" module.
2695 self._check_abc_inheritance(io)
2696
Antoine Pitroue033e062010-10-29 10:38:18 +00002697 def _check_warn_on_dealloc(self, *args, **kwargs):
2698 f = open(*args, **kwargs)
2699 r = repr(f)
2700 with self.assertWarns(ResourceWarning) as cm:
2701 f = None
2702 support.gc_collect()
2703 self.assertIn(r, str(cm.warning.args[0]))
2704
2705 def test_warn_on_dealloc(self):
2706 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2707 self._check_warn_on_dealloc(support.TESTFN, "wb")
2708 self._check_warn_on_dealloc(support.TESTFN, "w")
2709
2710 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2711 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002712 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002713 for fd in fds:
2714 try:
2715 os.close(fd)
2716 except EnvironmentError as e:
2717 if e.errno != errno.EBADF:
2718 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002719 self.addCleanup(cleanup_fds)
2720 r, w = os.pipe()
2721 fds += r, w
2722 self._check_warn_on_dealloc(r, *args, **kwargs)
2723 # When using closefd=False, there's no warning
2724 r, w = os.pipe()
2725 fds += r, w
2726 with warnings.catch_warnings(record=True) as recorded:
2727 open(r, *args, closefd=False, **kwargs)
2728 support.gc_collect()
2729 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002730
2731 def test_warn_on_dealloc_fd(self):
2732 self._check_warn_on_dealloc_fd("rb", buffering=0)
2733 self._check_warn_on_dealloc_fd("rb")
2734 self._check_warn_on_dealloc_fd("r")
2735
2736
Antoine Pitrou243757e2010-11-05 21:15:39 +00002737 def test_pickling(self):
2738 # Pickling file objects is forbidden
2739 for kwargs in [
2740 {"mode": "w"},
2741 {"mode": "wb"},
2742 {"mode": "wb", "buffering": 0},
2743 {"mode": "r"},
2744 {"mode": "rb"},
2745 {"mode": "rb", "buffering": 0},
2746 {"mode": "w+"},
2747 {"mode": "w+b"},
2748 {"mode": "w+b", "buffering": 0},
2749 ]:
2750 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2751 with self.open(support.TESTFN, **kwargs) as f:
2752 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2753
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002754 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2755 def test_nonblock_pipe_write_bigbuf(self):
2756 self._test_nonblock_pipe_write(16*1024)
2757
2758 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2759 def test_nonblock_pipe_write_smallbuf(self):
2760 self._test_nonblock_pipe_write(1024)
2761
2762 def _set_non_blocking(self, fd):
2763 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2764 self.assertNotEqual(flags, -1)
2765 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2766 self.assertEqual(res, 0)
2767
2768 def _test_nonblock_pipe_write(self, bufsize):
2769 sent = []
2770 received = []
2771 r, w = os.pipe()
2772 self._set_non_blocking(r)
2773 self._set_non_blocking(w)
2774
2775 # To exercise all code paths in the C implementation we need
2776 # to play with buffer sizes. For instance, if we choose a
2777 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2778 # then we will never get a partial write of the buffer.
2779 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2780 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2781
2782 with rf, wf:
2783 for N in 9999, 73, 7574:
2784 try:
2785 i = 0
2786 while True:
2787 msg = bytes([i % 26 + 97]) * N
2788 sent.append(msg)
2789 wf.write(msg)
2790 i += 1
2791
2792 except self.BlockingIOError as e:
2793 self.assertEqual(e.args[0], errno.EAGAIN)
2794 sent[-1] = sent[-1][:e.characters_written]
2795 received.append(rf.read())
2796 msg = b'BLOCKED'
2797 wf.write(msg)
2798 sent.append(msg)
2799
2800 while True:
2801 try:
2802 wf.flush()
2803 break
2804 except self.BlockingIOError as e:
2805 self.assertEqual(e.args[0], errno.EAGAIN)
2806 self.assertEqual(e.characters_written, 0)
2807 received.append(rf.read())
2808
2809 received += iter(rf.read, None)
2810
2811 sent, received = b''.join(sent), b''.join(received)
2812 self.assertTrue(sent == received)
2813 self.assertTrue(wf.closed)
2814 self.assertTrue(rf.closed)
2815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816class CMiscIOTest(MiscIOTest):
2817 io = io
2818
2819class PyMiscIOTest(MiscIOTest):
2820 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002821
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002822
2823@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2824class SignalsTest(unittest.TestCase):
2825
2826 def setUp(self):
2827 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2828
2829 def tearDown(self):
2830 signal.signal(signal.SIGALRM, self.oldalrm)
2831
2832 def alarm_interrupt(self, sig, frame):
2833 1/0
2834
2835 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002836 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2837 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002838 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2839 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002840 invokes the signal handler, and bubbles up the exception raised
2841 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002842 read_results = []
2843 def _read():
2844 s = os.read(r, 1)
2845 read_results.append(s)
2846 t = threading.Thread(target=_read)
2847 t.daemon = True
2848 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002849 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002850 try:
2851 wio = self.io.open(w, **fdopen_kwargs)
2852 t.start()
2853 signal.alarm(1)
2854 # Fill the pipe enough that the write will be blocking.
2855 # It will be interrupted by the timer armed above. Since the
2856 # other thread has read one byte, the low-level write will
2857 # return with a successful (partial) result rather than an EINTR.
2858 # The buffered IO layer must check for pending signal
2859 # handlers, which in this case will invoke alarm_interrupt().
2860 self.assertRaises(ZeroDivisionError,
2861 wio.write, item * (1024 * 1024))
2862 t.join()
2863 # We got one byte, get another one and check that it isn't a
2864 # repeat of the first one.
2865 read_results.append(os.read(r, 1))
2866 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2867 finally:
2868 os.close(w)
2869 os.close(r)
2870 # This is deliberate. If we didn't close the file descriptor
2871 # before closing wio, wio would try to flush its internal
2872 # buffer, and block again.
2873 try:
2874 wio.close()
2875 except IOError as e:
2876 if e.errno != errno.EBADF:
2877 raise
2878
2879 def test_interrupted_write_unbuffered(self):
2880 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2881
2882 def test_interrupted_write_buffered(self):
2883 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2884
2885 def test_interrupted_write_text(self):
2886 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2887
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002888 def check_reentrant_write(self, data, **fdopen_kwargs):
2889 def on_alarm(*args):
2890 # Will be called reentrantly from the same thread
2891 wio.write(data)
2892 1/0
2893 signal.signal(signal.SIGALRM, on_alarm)
2894 r, w = os.pipe()
2895 wio = self.io.open(w, **fdopen_kwargs)
2896 try:
2897 signal.alarm(1)
2898 # Either the reentrant call to wio.write() fails with RuntimeError,
2899 # or the signal handler raises ZeroDivisionError.
2900 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2901 while 1:
2902 for i in range(100):
2903 wio.write(data)
2904 wio.flush()
2905 # Make sure the buffer doesn't fill up and block further writes
2906 os.read(r, len(data) * 100)
2907 exc = cm.exception
2908 if isinstance(exc, RuntimeError):
2909 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2910 finally:
2911 wio.close()
2912 os.close(r)
2913
2914 def test_reentrant_write_buffered(self):
2915 self.check_reentrant_write(b"xy", mode="wb")
2916
2917 def test_reentrant_write_text(self):
2918 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2919
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002920 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2921 """Check that a buffered read, when it gets interrupted (either
2922 returning a partial result or EINTR), properly invokes the signal
2923 handler and retries if the latter returned successfully."""
2924 r, w = os.pipe()
2925 fdopen_kwargs["closefd"] = False
2926 def alarm_handler(sig, frame):
2927 os.write(w, b"bar")
2928 signal.signal(signal.SIGALRM, alarm_handler)
2929 try:
2930 rio = self.io.open(r, **fdopen_kwargs)
2931 os.write(w, b"foo")
2932 signal.alarm(1)
2933 # Expected behaviour:
2934 # - first raw read() returns partial b"foo"
2935 # - second raw read() returns EINTR
2936 # - third raw read() returns b"bar"
2937 self.assertEqual(decode(rio.read(6)), "foobar")
2938 finally:
2939 rio.close()
2940 os.close(w)
2941 os.close(r)
2942
Antoine Pitrou20db5112011-08-19 20:32:34 +02002943 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002944 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2945 mode="rb")
2946
Antoine Pitrou20db5112011-08-19 20:32:34 +02002947 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002948 self.check_interrupted_read_retry(lambda x: x,
2949 mode="r")
2950
2951 @unittest.skipUnless(threading, 'Threading required for this test.')
2952 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2953 """Check that a buffered write, when it gets interrupted (either
2954 returning a partial result or EINTR), properly invokes the signal
2955 handler and retries if the latter returned successfully."""
2956 select = support.import_module("select")
2957 # A quantity that exceeds the buffer size of an anonymous pipe's
2958 # write end.
2959 N = 1024 * 1024
2960 r, w = os.pipe()
2961 fdopen_kwargs["closefd"] = False
2962 # We need a separate thread to read from the pipe and allow the
2963 # write() to finish. This thread is started after the SIGALRM is
2964 # received (forcing a first EINTR in write()).
2965 read_results = []
2966 write_finished = False
2967 def _read():
2968 while not write_finished:
2969 while r in select.select([r], [], [], 1.0)[0]:
2970 s = os.read(r, 1024)
2971 read_results.append(s)
2972 t = threading.Thread(target=_read)
2973 t.daemon = True
2974 def alarm1(sig, frame):
2975 signal.signal(signal.SIGALRM, alarm2)
2976 signal.alarm(1)
2977 def alarm2(sig, frame):
2978 t.start()
2979 signal.signal(signal.SIGALRM, alarm1)
2980 try:
2981 wio = self.io.open(w, **fdopen_kwargs)
2982 signal.alarm(1)
2983 # Expected behaviour:
2984 # - first raw write() is partial (because of the limited pipe buffer
2985 # and the first alarm)
2986 # - second raw write() returns EINTR (because of the second alarm)
2987 # - subsequent write()s are successful (either partial or complete)
2988 self.assertEqual(N, wio.write(item * N))
2989 wio.flush()
2990 write_finished = True
2991 t.join()
2992 self.assertEqual(N, sum(len(x) for x in read_results))
2993 finally:
2994 write_finished = True
2995 os.close(w)
2996 os.close(r)
2997 # This is deliberate. If we didn't close the file descriptor
2998 # before closing wio, wio would try to flush its internal
2999 # buffer, and could block (in case of failure).
3000 try:
3001 wio.close()
3002 except IOError as e:
3003 if e.errno != errno.EBADF:
3004 raise
3005
Antoine Pitrou20db5112011-08-19 20:32:34 +02003006 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003007 self.check_interrupted_write_retry(b"x", mode="wb")
3008
Antoine Pitrou20db5112011-08-19 20:32:34 +02003009 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003010 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3011
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003012
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003013class CSignalsTest(SignalsTest):
3014 io = io
3015
3016class PySignalsTest(SignalsTest):
3017 io = pyio
3018
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003019 # Handling reentrancy issues would slow down _pyio even more, so the
3020 # tests are disabled.
3021 test_reentrant_write_buffered = None
3022 test_reentrant_write_text = None
3023
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003024
Guido van Rossum28524c72007-02-27 05:47:44 +00003025def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003026 tests = (CIOTest, PyIOTest,
3027 CBufferedReaderTest, PyBufferedReaderTest,
3028 CBufferedWriterTest, PyBufferedWriterTest,
3029 CBufferedRWPairTest, PyBufferedRWPairTest,
3030 CBufferedRandomTest, PyBufferedRandomTest,
3031 StatefulIncrementalDecoderTest,
3032 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3033 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003034 CMiscIOTest, PyMiscIOTest,
3035 CSignalsTest, PySignalsTest,
3036 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003037
3038 # Put the namespaces of the IO module we are testing and some useful mock
3039 # classes in the __dict__ of each test.
3040 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003041 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3043 c_io_ns = {name : getattr(io, name) for name in all_members}
3044 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3045 globs = globals()
3046 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3047 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3048 # Avoid turning open into a bound method.
3049 py_io_ns["open"] = pyio.OpenWrapper
3050 for test in tests:
3051 if test.__name__.startswith("C"):
3052 for name, obj in c_io_ns.items():
3053 setattr(test, name, obj)
3054 elif test.__name__.startswith("Py"):
3055 for name, obj in py_io_ns.items():
3056 setattr(test, name, obj)
3057
3058 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003059
3060if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003061 test_main()