blob: 1129484533aed2bf4afda68a8d46cf2bbba2c0a2 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200614
615 def test_IOBase_finalize(self):
616 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
617 # class which inherits IOBase and an object of this class are caught
618 # in a reference cycle and close() is already in the method cache.
619 class MyIO(self.IOBase):
620 def close(self):
621 pass
622
623 # create an instance to populate the method cache
624 MyIO()
625 obj = MyIO()
626 obj.obj = obj
627 wr = weakref.ref(obj)
628 del MyIO
629 del obj
630 support.gc_collect()
631 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633class PyIOTest(IOTest):
634 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000635
Guido van Rossuma9e20242007-03-08 00:43:48 +0000636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637class CommonBufferedTests:
638 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
639
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000640 def test_detach(self):
641 raw = self.MockRawIO()
642 buf = self.tp(raw)
643 self.assertIs(buf.detach(), raw)
644 self.assertRaises(ValueError, buf.detach)
645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 def test_fileno(self):
647 rawio = self.MockRawIO()
648 bufio = self.tp(rawio)
649
Ezio Melottib3aedd42010-11-20 19:04:17 +0000650 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651
652 def test_no_fileno(self):
653 # XXX will we always have fileno() function? If so, kill
654 # this test. Else, write it.
655 pass
656
657 def test_invalid_args(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660 # Invalid whence
661 self.assertRaises(ValueError, bufio.seek, 0, -1)
662 self.assertRaises(ValueError, bufio.seek, 0, 3)
663
664 def test_override_destructor(self):
665 tp = self.tp
666 record = []
667 class MyBufferedIO(tp):
668 def __del__(self):
669 record.append(1)
670 try:
671 f = super().__del__
672 except AttributeError:
673 pass
674 else:
675 f()
676 def close(self):
677 record.append(2)
678 super().close()
679 def flush(self):
680 record.append(3)
681 super().flush()
682 rawio = self.MockRawIO()
683 bufio = MyBufferedIO(rawio)
684 writable = bufio.writable()
685 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000686 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 if writable:
688 self.assertEqual(record, [1, 2, 3])
689 else:
690 self.assertEqual(record, [1, 2])
691
692 def test_context_manager(self):
693 # Test usability as a context manager
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696 def _with():
697 with bufio:
698 pass
699 _with()
700 # bufio should now be closed, and using it a second time should raise
701 # a ValueError.
702 self.assertRaises(ValueError, _with)
703
704 def test_error_through_destructor(self):
705 # Test that the exception state is not modified by a destructor,
706 # even if close() fails.
707 rawio = self.CloseFailureIO()
708 def f():
709 self.tp(rawio).xyzzy
710 with support.captured_output("stderr") as s:
711 self.assertRaises(AttributeError, f)
712 s = s.getvalue().strip()
713 if s:
714 # The destructor *may* have printed an unraisable error, check it
715 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000716 self.assertTrue(s.startswith("Exception IOError: "), s)
717 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000718
Antoine Pitrou716c4442009-05-23 19:04:03 +0000719 def test_repr(self):
720 raw = self.MockRawIO()
721 b = self.tp(raw)
722 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
723 self.assertEqual(repr(b), "<%s>" % clsname)
724 raw.name = "dummy"
725 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
726 raw.name = b"dummy"
727 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
728
Antoine Pitrou6be88762010-05-03 16:48:20 +0000729 def test_flush_error_on_close(self):
730 raw = self.MockRawIO()
731 def bad_flush():
732 raise IOError()
733 raw.flush = bad_flush
734 b = self.tp(raw)
735 self.assertRaises(IOError, b.close) # exception not swallowed
736
737 def test_multi_close(self):
738 raw = self.MockRawIO()
739 b = self.tp(raw)
740 b.close()
741 b.close()
742 b.close()
743 self.assertRaises(ValueError, b.flush)
744
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000745 def test_unseekable(self):
746 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
747 self.assertRaises(self.UnsupportedOperation, bufio.tell)
748 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
749
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000750 def test_readonly_attributes(self):
751 raw = self.MockRawIO()
752 buf = self.tp(raw)
753 x = self.MockRawIO()
754 with self.assertRaises(AttributeError):
755 buf.raw = x
756
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
759 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000761 def test_constructor(self):
762 rawio = self.MockRawIO([b"abc"])
763 bufio = self.tp(rawio)
764 bufio.__init__(rawio)
765 bufio.__init__(rawio, buffer_size=1024)
766 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000767 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000768 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
769 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
770 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
771 rawio = self.MockRawIO([b"abc"])
772 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000773 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000775 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000776 for arg in (None, 7):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000779 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000783 def test_read1(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000786 self.assertEqual(b"a", bufio.read(1))
787 self.assertEqual(b"b", bufio.read1(1))
788 self.assertEqual(rawio._reads, 1)
789 self.assertEqual(b"c", bufio.read1(100))
790 self.assertEqual(rawio._reads, 1)
791 self.assertEqual(b"d", bufio.read1(100))
792 self.assertEqual(rawio._reads, 2)
793 self.assertEqual(b"efg", bufio.read1(100))
794 self.assertEqual(rawio._reads, 3)
795 self.assertEqual(b"", bufio.read1(100))
796 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000797 # Invalid args
798 self.assertRaises(ValueError, bufio.read1, -1)
799
800 def test_readinto(self):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
803 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(bufio.readinto(b), 2)
805 self.assertEqual(b, b"ab")
806 self.assertEqual(bufio.readinto(b), 2)
807 self.assertEqual(b, b"cd")
808 self.assertEqual(bufio.readinto(b), 2)
809 self.assertEqual(b, b"ef")
810 self.assertEqual(bufio.readinto(b), 1)
811 self.assertEqual(b, b"gf")
812 self.assertEqual(bufio.readinto(b), 0)
813 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200814 rawio = self.MockRawIO((b"abc", None))
815 bufio = self.tp(rawio)
816 self.assertEqual(bufio.readinto(b), 2)
817 self.assertEqual(b, b"ab")
818 self.assertEqual(bufio.readinto(b), 1)
819 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000821 def test_readlines(self):
822 def bufio():
823 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
824 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000825 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
826 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
827 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000830 data = b"abcdefghi"
831 dlen = len(data)
832
833 tests = [
834 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
835 [ 100, [ 3, 3, 3], [ dlen ] ],
836 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
837 ]
838
839 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 rawio = self.MockFileIO(data)
841 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000842 pos = 0
843 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000844 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000845 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000846 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000847 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000850 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
852 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000853 self.assertEqual(b"abcd", bufio.read(6))
854 self.assertEqual(b"e", bufio.read(1))
855 self.assertEqual(b"fg", bufio.read())
856 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200857 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000859
Victor Stinnera80987f2011-05-25 22:47:16 +0200860 rawio = self.MockRawIO((b"a", None, None))
861 self.assertEqual(b"a", rawio.readall())
862 self.assertIsNone(rawio.readall())
863
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 def test_read_past_eof(self):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000867
Ezio Melottib3aedd42010-11-20 19:04:17 +0000868 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870 def test_read_all(self):
871 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
872 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000873
Ezio Melottib3aedd42010-11-20 19:04:17 +0000874 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000875
Victor Stinner45df8202010-04-28 22:31:17 +0000876 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000877 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000878 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000879 try:
880 # Write out many bytes with exactly the same number of 0's,
881 # 1's... 255's. This will help us check that concurrent reading
882 # doesn't duplicate or forget contents.
883 N = 1000
884 l = list(range(256)) * N
885 random.shuffle(l)
886 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000887 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000888 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000889 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000891 errors = []
892 results = []
893 def f():
894 try:
895 # Intra-buffer read then buffer-flushing read
896 for n in cycle([1, 19]):
897 s = bufio.read(n)
898 if not s:
899 break
900 # list.append() is atomic
901 results.append(s)
902 except Exception as e:
903 errors.append(e)
904 raise
905 threads = [threading.Thread(target=f) for x in range(20)]
906 for t in threads:
907 t.start()
908 time.sleep(0.02) # yield
909 for t in threads:
910 t.join()
911 self.assertFalse(errors,
912 "the following exceptions were caught: %r" % errors)
913 s = b''.join(results)
914 for i in range(256):
915 c = bytes(bytearray([i]))
916 self.assertEqual(s.count(c), N)
917 finally:
918 support.unlink(support.TESTFN)
919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000920 def test_misbehaved_io(self):
921 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
922 bufio = self.tp(rawio)
923 self.assertRaises(IOError, bufio.seek, 0)
924 self.assertRaises(IOError, bufio.tell)
925
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000926 def test_no_extraneous_read(self):
927 # Issue #9550; when the raw IO object has satisfied the read request,
928 # we should not issue any additional reads, otherwise it may block
929 # (e.g. socket).
930 bufsize = 16
931 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
932 rawio = self.MockRawIO([b"x" * n])
933 bufio = self.tp(rawio, bufsize)
934 self.assertEqual(bufio.read(n), b"x" * n)
935 # Simple case: one raw read is enough to satisfy the request.
936 self.assertEqual(rawio._extraneous_reads, 0,
937 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
938 # A more complex case where two raw reads are needed to satisfy
939 # the request.
940 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
941 bufio = self.tp(rawio, bufsize)
942 self.assertEqual(bufio.read(n), b"x" * n)
943 self.assertEqual(rawio._extraneous_reads, 0,
944 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
945
946
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947class CBufferedReaderTest(BufferedReaderTest):
948 tp = io.BufferedReader
949
950 def test_constructor(self):
951 BufferedReaderTest.test_constructor(self)
952 # The allocation can succeed on 32-bit builds, e.g. with more
953 # than 2GB RAM and a 64-bit kernel.
954 if sys.maxsize > 0x7FFFFFFF:
955 rawio = self.MockRawIO()
956 bufio = self.tp(rawio)
957 self.assertRaises((OverflowError, MemoryError, ValueError),
958 bufio.__init__, rawio, sys.maxsize)
959
960 def test_initialization(self):
961 rawio = self.MockRawIO([b"abc"])
962 bufio = self.tp(rawio)
963 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
964 self.assertRaises(ValueError, bufio.read)
965 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
966 self.assertRaises(ValueError, bufio.read)
967 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
968 self.assertRaises(ValueError, bufio.read)
969
970 def test_misbehaved_io_read(self):
971 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
972 bufio = self.tp(rawio)
973 # _pyio.BufferedReader seems to implement reading different, so that
974 # checking this is not so easy.
975 self.assertRaises(IOError, bufio.read, 10)
976
977 def test_garbage_collection(self):
978 # C BufferedReader objects are collected.
979 # The Python version has __del__, so it ends into gc.garbage instead
980 rawio = self.FileIO(support.TESTFN, "w+b")
981 f = self.tp(rawio)
982 f.f = f
983 wr = weakref.ref(f)
984 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000985 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000986 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987
988class PyBufferedReaderTest(BufferedReaderTest):
989 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000990
Guido van Rossuma9e20242007-03-08 00:43:48 +0000991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
993 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 def test_constructor(self):
996 rawio = self.MockRawIO()
997 bufio = self.tp(rawio)
998 bufio.__init__(rawio)
999 bufio.__init__(rawio, buffer_size=1024)
1000 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001001 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 bufio.flush()
1003 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1004 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1006 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001007 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001009 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001011 def test_detach_flush(self):
1012 raw = self.MockRawIO()
1013 buf = self.tp(raw)
1014 buf.write(b"howdy!")
1015 self.assertFalse(raw._write_stack)
1016 buf.detach()
1017 self.assertEqual(raw._write_stack, [b"howdy!"])
1018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001020 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 writer = self.MockRawIO()
1022 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001023 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001024 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_write_overflow(self):
1027 writer = self.MockRawIO()
1028 bufio = self.tp(writer, 8)
1029 contents = b"abcdefghijklmnop"
1030 for n in range(0, len(contents), 3):
1031 bufio.write(contents[n:n+3])
1032 flushed = b"".join(writer._write_stack)
1033 # At least (total - 8) bytes were implicitly flushed, perhaps more
1034 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001035 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def check_writes(self, intermediate_func):
1038 # Lots of writes, test the flushed output is as expected.
1039 contents = bytes(range(256)) * 1000
1040 n = 0
1041 writer = self.MockRawIO()
1042 bufio = self.tp(writer, 13)
1043 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1044 def gen_sizes():
1045 for size in count(1):
1046 for i in range(15):
1047 yield size
1048 sizes = gen_sizes()
1049 while n < len(contents):
1050 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001051 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 intermediate_func(bufio)
1053 n += size
1054 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001055 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_writes(self):
1058 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_writes_and_flushes(self):
1061 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_writes_and_seeks(self):
1064 def _seekabs(bufio):
1065 pos = bufio.tell()
1066 bufio.seek(pos + 1, 0)
1067 bufio.seek(pos - 1, 0)
1068 bufio.seek(pos, 0)
1069 self.check_writes(_seekabs)
1070 def _seekrel(bufio):
1071 pos = bufio.seek(0, 1)
1072 bufio.seek(+1, 1)
1073 bufio.seek(-1, 1)
1074 bufio.seek(pos, 0)
1075 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 def test_writes_and_truncates(self):
1078 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def test_write_non_blocking(self):
1081 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001082 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001083
Ezio Melottib3aedd42010-11-20 19:04:17 +00001084 self.assertEqual(bufio.write(b"abcd"), 4)
1085 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086 # 1 byte will be written, the rest will be buffered
1087 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001088 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1091 raw.block_on(b"0")
1092 try:
1093 bufio.write(b"opqrwxyz0123456789")
1094 except self.BlockingIOError as e:
1095 written = e.characters_written
1096 else:
1097 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001098 self.assertEqual(written, 16)
1099 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 s = raw.pop_written()
1104 # Previously buffered bytes were flushed
1105 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001106
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 def test_write_and_rewind(self):
1108 raw = io.BytesIO()
1109 bufio = self.tp(raw, 4)
1110 self.assertEqual(bufio.write(b"abcdef"), 6)
1111 self.assertEqual(bufio.tell(), 6)
1112 bufio.seek(0, 0)
1113 self.assertEqual(bufio.write(b"XY"), 2)
1114 bufio.seek(6, 0)
1115 self.assertEqual(raw.getvalue(), b"XYcdef")
1116 self.assertEqual(bufio.write(b"123456"), 6)
1117 bufio.flush()
1118 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 def test_flush(self):
1121 writer = self.MockRawIO()
1122 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001123 bufio.write(b"abc")
1124 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001125 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 def test_destructor(self):
1128 writer = self.MockRawIO()
1129 bufio = self.tp(writer, 8)
1130 bufio.write(b"abc")
1131 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001132 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001133 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134
1135 def test_truncate(self):
1136 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001137 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 bufio = self.tp(raw, 8)
1139 bufio.write(b"abcdef")
1140 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001141 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001142 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 self.assertEqual(f.read(), b"abc")
1144
Victor Stinner45df8202010-04-28 22:31:17 +00001145 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001146 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001148 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 # Write out many bytes from many threads and test they were
1150 # all flushed.
1151 N = 1000
1152 contents = bytes(range(256)) * N
1153 sizes = cycle([1, 19])
1154 n = 0
1155 queue = deque()
1156 while n < len(contents):
1157 size = next(sizes)
1158 queue.append(contents[n:n+size])
1159 n += size
1160 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001161 # We use a real file object because it allows us to
1162 # exercise situations where the GIL is released before
1163 # writing the buffer to the raw streams. This is in addition
1164 # to concurrency issues due to switching threads in the middle
1165 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001166 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001168 errors = []
1169 def f():
1170 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 while True:
1172 try:
1173 s = queue.popleft()
1174 except IndexError:
1175 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001176 bufio.write(s)
1177 except Exception as e:
1178 errors.append(e)
1179 raise
1180 threads = [threading.Thread(target=f) for x in range(20)]
1181 for t in threads:
1182 t.start()
1183 time.sleep(0.02) # yield
1184 for t in threads:
1185 t.join()
1186 self.assertFalse(errors,
1187 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001189 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 s = f.read()
1191 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001192 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001193 finally:
1194 support.unlink(support.TESTFN)
1195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 def test_misbehaved_io(self):
1197 rawio = self.MisbehavedRawIO()
1198 bufio = self.tp(rawio, 5)
1199 self.assertRaises(IOError, bufio.seek, 0)
1200 self.assertRaises(IOError, bufio.tell)
1201 self.assertRaises(IOError, bufio.write, b"abcdef")
1202
Benjamin Peterson59406a92009-03-26 17:10:29 +00001203 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001204 with support.check_warnings(("max_buffer_size is deprecated",
1205 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001206 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001207
1208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209class CBufferedWriterTest(BufferedWriterTest):
1210 tp = io.BufferedWriter
1211
1212 def test_constructor(self):
1213 BufferedWriterTest.test_constructor(self)
1214 # The allocation can succeed on 32-bit builds, e.g. with more
1215 # than 2GB RAM and a 64-bit kernel.
1216 if sys.maxsize > 0x7FFFFFFF:
1217 rawio = self.MockRawIO()
1218 bufio = self.tp(rawio)
1219 self.assertRaises((OverflowError, MemoryError, ValueError),
1220 bufio.__init__, rawio, sys.maxsize)
1221
1222 def test_initialization(self):
1223 rawio = self.MockRawIO()
1224 bufio = self.tp(rawio)
1225 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1226 self.assertRaises(ValueError, bufio.write, b"def")
1227 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1228 self.assertRaises(ValueError, bufio.write, b"def")
1229 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1230 self.assertRaises(ValueError, bufio.write, b"def")
1231
1232 def test_garbage_collection(self):
1233 # C BufferedWriter objects are collected, and collecting them flushes
1234 # all data to disk.
1235 # The Python version has __del__, so it ends into gc.garbage instead
1236 rawio = self.FileIO(support.TESTFN, "w+b")
1237 f = self.tp(rawio)
1238 f.write(b"123xxx")
1239 f.x = f
1240 wr = weakref.ref(f)
1241 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001242 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001243 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001244 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 self.assertEqual(f.read(), b"123xxx")
1246
1247
1248class PyBufferedWriterTest(BufferedWriterTest):
1249 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001250
Guido van Rossum01a27522007-03-07 01:00:12 +00001251class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001252
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001253 def test_constructor(self):
1254 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001255 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001256
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001257 def test_detach(self):
1258 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1259 self.assertRaises(self.UnsupportedOperation, pair.detach)
1260
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001261 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001262 with support.check_warnings(("max_buffer_size is deprecated",
1263 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001264 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001265
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001266 def test_constructor_with_not_readable(self):
1267 class NotReadable(MockRawIO):
1268 def readable(self):
1269 return False
1270
1271 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1272
1273 def test_constructor_with_not_writeable(self):
1274 class NotWriteable(MockRawIO):
1275 def writable(self):
1276 return False
1277
1278 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1279
1280 def test_read(self):
1281 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1282
1283 self.assertEqual(pair.read(3), b"abc")
1284 self.assertEqual(pair.read(1), b"d")
1285 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001286 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1287 self.assertEqual(pair.read(None), b"abc")
1288
1289 def test_readlines(self):
1290 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1291 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1292 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1293 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001294
1295 def test_read1(self):
1296 # .read1() is delegated to the underlying reader object, so this test
1297 # can be shallow.
1298 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1299
1300 self.assertEqual(pair.read1(3), b"abc")
1301
1302 def test_readinto(self):
1303 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1304
1305 data = bytearray(5)
1306 self.assertEqual(pair.readinto(data), 5)
1307 self.assertEqual(data, b"abcde")
1308
1309 def test_write(self):
1310 w = self.MockRawIO()
1311 pair = self.tp(self.MockRawIO(), w)
1312
1313 pair.write(b"abc")
1314 pair.flush()
1315 pair.write(b"def")
1316 pair.flush()
1317 self.assertEqual(w._write_stack, [b"abc", b"def"])
1318
1319 def test_peek(self):
1320 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1321
1322 self.assertTrue(pair.peek(3).startswith(b"abc"))
1323 self.assertEqual(pair.read(3), b"abc")
1324
1325 def test_readable(self):
1326 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1327 self.assertTrue(pair.readable())
1328
1329 def test_writeable(self):
1330 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1331 self.assertTrue(pair.writable())
1332
1333 def test_seekable(self):
1334 # BufferedRWPairs are never seekable, even if their readers and writers
1335 # are.
1336 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1337 self.assertFalse(pair.seekable())
1338
1339 # .flush() is delegated to the underlying writer object and has been
1340 # tested in the test_write method.
1341
1342 def test_close_and_closed(self):
1343 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1344 self.assertFalse(pair.closed)
1345 pair.close()
1346 self.assertTrue(pair.closed)
1347
1348 def test_isatty(self):
1349 class SelectableIsAtty(MockRawIO):
1350 def __init__(self, isatty):
1351 MockRawIO.__init__(self)
1352 self._isatty = isatty
1353
1354 def isatty(self):
1355 return self._isatty
1356
1357 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1358 self.assertFalse(pair.isatty())
1359
1360 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1361 self.assertTrue(pair.isatty())
1362
1363 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1364 self.assertTrue(pair.isatty())
1365
1366 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1367 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369class CBufferedRWPairTest(BufferedRWPairTest):
1370 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372class PyBufferedRWPairTest(BufferedRWPairTest):
1373 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001374
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375
1376class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1377 read_mode = "rb+"
1378 write_mode = "wb+"
1379
1380 def test_constructor(self):
1381 BufferedReaderTest.test_constructor(self)
1382 BufferedWriterTest.test_constructor(self)
1383
1384 def test_read_and_write(self):
1385 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001386 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001387
1388 self.assertEqual(b"as", rw.read(2))
1389 rw.write(b"ddd")
1390 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001391 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001394
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001395 def test_seek_and_tell(self):
1396 raw = self.BytesIO(b"asdfghjkl")
1397 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001398
Ezio Melottib3aedd42010-11-20 19:04:17 +00001399 self.assertEqual(b"as", rw.read(2))
1400 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001401 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001402 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001403
Antoine Pitroue05565e2011-08-20 14:39:23 +02001404 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001405 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001406 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001408 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001409 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001410 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001411 self.assertEqual(7, rw.tell())
1412 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001413 rw.flush()
1414 self.assertEqual(b"asdf123fl", raw.getvalue())
1415
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001416 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001417
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001418 def check_flush_and_read(self, read_func):
1419 raw = self.BytesIO(b"abcdefghi")
1420 bufio = self.tp(raw)
1421
Ezio Melottib3aedd42010-11-20 19:04:17 +00001422 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(b"ef", read_func(bufio, 2))
1425 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(6, bufio.tell())
1428 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 raw.seek(0, 0)
1430 raw.write(b"XYZ")
1431 # flush() resets the read buffer
1432 bufio.flush()
1433 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001434 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435
1436 def test_flush_and_read(self):
1437 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1438
1439 def test_flush_and_readinto(self):
1440 def _readinto(bufio, n=-1):
1441 b = bytearray(n if n >= 0 else 9999)
1442 n = bufio.readinto(b)
1443 return bytes(b[:n])
1444 self.check_flush_and_read(_readinto)
1445
1446 def test_flush_and_peek(self):
1447 def _peek(bufio, n=-1):
1448 # This relies on the fact that the buffer can contain the whole
1449 # raw stream, otherwise peek() can return less.
1450 b = bufio.peek(n)
1451 if n != -1:
1452 b = b[:n]
1453 bufio.seek(len(b), 1)
1454 return b
1455 self.check_flush_and_read(_peek)
1456
1457 def test_flush_and_write(self):
1458 raw = self.BytesIO(b"abcdefghi")
1459 bufio = self.tp(raw)
1460
1461 bufio.write(b"123")
1462 bufio.flush()
1463 bufio.write(b"45")
1464 bufio.flush()
1465 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001466 self.assertEqual(b"12345fghi", raw.getvalue())
1467 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468
1469 def test_threads(self):
1470 BufferedReaderTest.test_threads(self)
1471 BufferedWriterTest.test_threads(self)
1472
1473 def test_writes_and_peek(self):
1474 def _peek(bufio):
1475 bufio.peek(1)
1476 self.check_writes(_peek)
1477 def _peek(bufio):
1478 pos = bufio.tell()
1479 bufio.seek(-1, 1)
1480 bufio.peek(1)
1481 bufio.seek(pos, 0)
1482 self.check_writes(_peek)
1483
1484 def test_writes_and_reads(self):
1485 def _read(bufio):
1486 bufio.seek(-1, 1)
1487 bufio.read(1)
1488 self.check_writes(_read)
1489
1490 def test_writes_and_read1s(self):
1491 def _read1(bufio):
1492 bufio.seek(-1, 1)
1493 bufio.read1(1)
1494 self.check_writes(_read1)
1495
1496 def test_writes_and_readintos(self):
1497 def _read(bufio):
1498 bufio.seek(-1, 1)
1499 bufio.readinto(bytearray(1))
1500 self.check_writes(_read)
1501
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001502 def test_write_after_readahead(self):
1503 # Issue #6629: writing after the buffer was filled by readahead should
1504 # first rewind the raw stream.
1505 for overwrite_size in [1, 5]:
1506 raw = self.BytesIO(b"A" * 10)
1507 bufio = self.tp(raw, 4)
1508 # Trigger readahead
1509 self.assertEqual(bufio.read(1), b"A")
1510 self.assertEqual(bufio.tell(), 1)
1511 # Overwriting should rewind the raw stream if it needs so
1512 bufio.write(b"B" * overwrite_size)
1513 self.assertEqual(bufio.tell(), overwrite_size + 1)
1514 # If the write size was smaller than the buffer size, flush() and
1515 # check that rewind happens.
1516 bufio.flush()
1517 self.assertEqual(bufio.tell(), overwrite_size + 1)
1518 s = raw.getvalue()
1519 self.assertEqual(s,
1520 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1521
Antoine Pitrou7c404892011-05-13 00:13:33 +02001522 def test_write_rewind_write(self):
1523 # Various combinations of reading / writing / seeking backwards / writing again
1524 def mutate(bufio, pos1, pos2):
1525 assert pos2 >= pos1
1526 # Fill the buffer
1527 bufio.seek(pos1)
1528 bufio.read(pos2 - pos1)
1529 bufio.write(b'\x02')
1530 # This writes earlier than the previous write, but still inside
1531 # the buffer.
1532 bufio.seek(pos1)
1533 bufio.write(b'\x01')
1534
1535 b = b"\x80\x81\x82\x83\x84"
1536 for i in range(0, len(b)):
1537 for j in range(i, len(b)):
1538 raw = self.BytesIO(b)
1539 bufio = self.tp(raw, 100)
1540 mutate(bufio, i, j)
1541 bufio.flush()
1542 expected = bytearray(b)
1543 expected[j] = 2
1544 expected[i] = 1
1545 self.assertEqual(raw.getvalue(), expected,
1546 "failed result for i=%d, j=%d" % (i, j))
1547
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001548 def test_truncate_after_read_or_write(self):
1549 raw = self.BytesIO(b"A" * 10)
1550 bufio = self.tp(raw, 100)
1551 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1552 self.assertEqual(bufio.truncate(), 2)
1553 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1554 self.assertEqual(bufio.truncate(), 4)
1555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 def test_misbehaved_io(self):
1557 BufferedReaderTest.test_misbehaved_io(self)
1558 BufferedWriterTest.test_misbehaved_io(self)
1559
Antoine Pitroue05565e2011-08-20 14:39:23 +02001560 def test_interleaved_read_write(self):
1561 # Test for issue #12213
1562 with self.BytesIO(b'abcdefgh') as raw:
1563 with self.tp(raw, 100) as f:
1564 f.write(b"1")
1565 self.assertEqual(f.read(1), b'b')
1566 f.write(b'2')
1567 self.assertEqual(f.read1(1), b'd')
1568 f.write(b'3')
1569 buf = bytearray(1)
1570 f.readinto(buf)
1571 self.assertEqual(buf, b'f')
1572 f.write(b'4')
1573 self.assertEqual(f.peek(1), b'h')
1574 f.flush()
1575 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1576
1577 with self.BytesIO(b'abc') as raw:
1578 with self.tp(raw, 100) as f:
1579 self.assertEqual(f.read(1), b'a')
1580 f.write(b"2")
1581 self.assertEqual(f.read(1), b'c')
1582 f.flush()
1583 self.assertEqual(raw.getvalue(), b'a2c')
1584
1585 def test_interleaved_readline_write(self):
1586 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1587 with self.tp(raw) as f:
1588 f.write(b'1')
1589 self.assertEqual(f.readline(), b'b\n')
1590 f.write(b'2')
1591 self.assertEqual(f.readline(), b'def\n')
1592 f.write(b'3')
1593 self.assertEqual(f.readline(), b'\n')
1594 f.flush()
1595 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1596
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001597 # You can't construct a BufferedRandom over a non-seekable stream.
1598 test_unseekable = None
1599
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600class CBufferedRandomTest(BufferedRandomTest):
1601 tp = io.BufferedRandom
1602
1603 def test_constructor(self):
1604 BufferedRandomTest.test_constructor(self)
1605 # The allocation can succeed on 32-bit builds, e.g. with more
1606 # than 2GB RAM and a 64-bit kernel.
1607 if sys.maxsize > 0x7FFFFFFF:
1608 rawio = self.MockRawIO()
1609 bufio = self.tp(rawio)
1610 self.assertRaises((OverflowError, MemoryError, ValueError),
1611 bufio.__init__, rawio, sys.maxsize)
1612
1613 def test_garbage_collection(self):
1614 CBufferedReaderTest.test_garbage_collection(self)
1615 CBufferedWriterTest.test_garbage_collection(self)
1616
1617class PyBufferedRandomTest(BufferedRandomTest):
1618 tp = pyio.BufferedRandom
1619
1620
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001621# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1622# properties:
1623# - A single output character can correspond to many bytes of input.
1624# - The number of input bytes to complete the character can be
1625# undetermined until the last input byte is received.
1626# - The number of input bytes can vary depending on previous input.
1627# - A single input byte can correspond to many characters of output.
1628# - The number of output characters can be undetermined until the
1629# last input byte is received.
1630# - The number of output characters can vary depending on previous input.
1631
1632class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1633 """
1634 For testing seek/tell behavior with a stateful, buffering decoder.
1635
1636 Input is a sequence of words. Words may be fixed-length (length set
1637 by input) or variable-length (period-terminated). In variable-length
1638 mode, extra periods are ignored. Possible words are:
1639 - 'i' followed by a number sets the input length, I (maximum 99).
1640 When I is set to 0, words are space-terminated.
1641 - 'o' followed by a number sets the output length, O (maximum 99).
1642 - Any other word is converted into a word followed by a period on
1643 the output. The output word consists of the input word truncated
1644 or padded out with hyphens to make its length equal to O. If O
1645 is 0, the word is output verbatim without truncating or padding.
1646 I and O are initially set to 1. When I changes, any buffered input is
1647 re-scanned according to the new I. EOF also terminates the last word.
1648 """
1649
1650 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001651 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001652 self.reset()
1653
1654 def __repr__(self):
1655 return '<SID %x>' % id(self)
1656
1657 def reset(self):
1658 self.i = 1
1659 self.o = 1
1660 self.buffer = bytearray()
1661
1662 def getstate(self):
1663 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1664 return bytes(self.buffer), i*100 + o
1665
1666 def setstate(self, state):
1667 buffer, io = state
1668 self.buffer = bytearray(buffer)
1669 i, o = divmod(io, 100)
1670 self.i, self.o = i ^ 1, o ^ 1
1671
1672 def decode(self, input, final=False):
1673 output = ''
1674 for b in input:
1675 if self.i == 0: # variable-length, terminated with period
1676 if b == ord('.'):
1677 if self.buffer:
1678 output += self.process_word()
1679 else:
1680 self.buffer.append(b)
1681 else: # fixed-length, terminate after self.i bytes
1682 self.buffer.append(b)
1683 if len(self.buffer) == self.i:
1684 output += self.process_word()
1685 if final and self.buffer: # EOF terminates the last word
1686 output += self.process_word()
1687 return output
1688
1689 def process_word(self):
1690 output = ''
1691 if self.buffer[0] == ord('i'):
1692 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1693 elif self.buffer[0] == ord('o'):
1694 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1695 else:
1696 output = self.buffer.decode('ascii')
1697 if len(output) < self.o:
1698 output += '-'*self.o # pad out with hyphens
1699 if self.o:
1700 output = output[:self.o] # truncate to output length
1701 output += '.'
1702 self.buffer = bytearray()
1703 return output
1704
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001705 codecEnabled = False
1706
1707 @classmethod
1708 def lookupTestDecoder(cls, name):
1709 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001710 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001711 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001712 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001713 incrementalencoder=None,
1714 streamreader=None, streamwriter=None,
1715 incrementaldecoder=cls)
1716
1717# Register the previous decoder for testing.
1718# Disabled by default, tests will enable it.
1719codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1720
1721
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001722class StatefulIncrementalDecoderTest(unittest.TestCase):
1723 """
1724 Make sure the StatefulIncrementalDecoder actually works.
1725 """
1726
1727 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001728 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001729 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001730 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001731 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001732 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001733 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001734 # I=0, O=6 (variable-length input, fixed-length output)
1735 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1736 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001737 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001738 # I=6, O=3 (fixed-length input > fixed-length output)
1739 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1740 # I=0, then 3; O=29, then 15 (with longer output)
1741 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1742 'a----------------------------.' +
1743 'b----------------------------.' +
1744 'cde--------------------------.' +
1745 'abcdefghijabcde.' +
1746 'a.b------------.' +
1747 '.c.------------.' +
1748 'd.e------------.' +
1749 'k--------------.' +
1750 'l--------------.' +
1751 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001752 ]
1753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001755 # Try a few one-shot test cases.
1756 for input, eof, output in self.test_cases:
1757 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001758 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001759
1760 # Also test an unfinished decode, followed by forcing EOF.
1761 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(d.decode(b'oiabcd'), '')
1763 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001764
1765class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001766
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001767 def setUp(self):
1768 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1769 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001770 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001771
Guido van Rossumd0712812007-04-11 16:32:43 +00001772 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001773 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 def test_constructor(self):
1776 r = self.BytesIO(b"\xc3\xa9\n\n")
1777 b = self.BufferedReader(r, 1000)
1778 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001779 t.__init__(b, encoding="latin-1", newline="\r\n")
1780 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001781 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001782 t.__init__(b, encoding="utf-8", line_buffering=True)
1783 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001784 self.assertEqual(t.line_buffering, True)
1785 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786 self.assertRaises(TypeError, t.__init__, b, newline=42)
1787 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1788
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001789 def test_detach(self):
1790 r = self.BytesIO()
1791 b = self.BufferedWriter(r)
1792 t = self.TextIOWrapper(b)
1793 self.assertIs(t.detach(), b)
1794
1795 t = self.TextIOWrapper(b, encoding="ascii")
1796 t.write("howdy")
1797 self.assertFalse(r.getvalue())
1798 t.detach()
1799 self.assertEqual(r.getvalue(), b"howdy")
1800 self.assertRaises(ValueError, t.detach)
1801
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001802 def test_repr(self):
1803 raw = self.BytesIO("hello".encode("utf-8"))
1804 b = self.BufferedReader(raw)
1805 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001806 modname = self.TextIOWrapper.__module__
1807 self.assertEqual(repr(t),
1808 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1809 raw.name = "dummy"
1810 self.assertEqual(repr(t),
1811 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001812 t.mode = "r"
1813 self.assertEqual(repr(t),
1814 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001815 raw.name = b"dummy"
1816 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001817 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001819 def test_line_buffering(self):
1820 r = self.BytesIO()
1821 b = self.BufferedWriter(r, 1000)
1822 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001823 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001824 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001825 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001826 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001827 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001828 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_encoding(self):
1831 # Check the encoding attribute is always set, and valid
1832 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001833 t = self.TextIOWrapper(b, encoding="utf-8")
1834 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001836 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 codecs.lookup(t.encoding)
1838
1839 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001840 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841 b = self.BytesIO(b"abc\n\xff\n")
1842 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001843 self.assertRaises(UnicodeError, t.read)
1844 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 b = self.BytesIO(b"abc\n\xff\n")
1846 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001847 self.assertRaises(UnicodeError, t.read)
1848 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 b = self.BytesIO(b"abc\n\xff\n")
1850 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001851 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001852 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001853 b = self.BytesIO(b"abc\n\xff\n")
1854 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001855 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001857 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001858 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001859 b = self.BytesIO()
1860 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001861 self.assertRaises(UnicodeError, t.write, "\xff")
1862 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 b = self.BytesIO()
1864 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001865 self.assertRaises(UnicodeError, t.write, "\xff")
1866 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 b = self.BytesIO()
1868 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001869 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001870 t.write("abc\xffdef\n")
1871 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001872 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001873 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 b = self.BytesIO()
1875 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001876 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001877 t.write("abc\xffdef\n")
1878 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001880
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001882 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1883
1884 tests = [
1885 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001886 [ '', input_lines ],
1887 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1888 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1889 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001890 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001891 encodings = (
1892 'utf-8', 'latin-1',
1893 'utf-16', 'utf-16-le', 'utf-16-be',
1894 'utf-32', 'utf-32-le', 'utf-32-be',
1895 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001896
Guido van Rossum8358db22007-08-18 21:39:55 +00001897 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001898 # character in TextIOWrapper._pending_line.
1899 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001900 # XXX: str.encode() should return bytes
1901 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001902 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001903 for bufsize in range(1, 10):
1904 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001905 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1906 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001907 encoding=encoding)
1908 if do_reads:
1909 got_lines = []
1910 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001911 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001912 if c2 == '':
1913 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001914 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001915 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001916 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001917 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001918
1919 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001920 self.assertEqual(got_line, exp_line)
1921 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923 def test_newlines_input(self):
1924 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001925 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1926 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001927 (None, normalized.decode("ascii").splitlines(True)),
1928 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1930 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1931 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001932 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 buf = self.BytesIO(testdata)
1934 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001935 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001936 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001937 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939 def test_newlines_output(self):
1940 testdict = {
1941 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1942 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1943 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1944 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1945 }
1946 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1947 for newline, expected in tests:
1948 buf = self.BytesIO()
1949 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1950 txt.write("AAA\nB")
1951 txt.write("BB\nCCC\n")
1952 txt.write("X\rY\r\nZ")
1953 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001954 self.assertEqual(buf.closed, False)
1955 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956
1957 def test_destructor(self):
1958 l = []
1959 base = self.BytesIO
1960 class MyBytesIO(base):
1961 def close(self):
1962 l.append(self.getvalue())
1963 base.close(self)
1964 b = MyBytesIO()
1965 t = self.TextIOWrapper(b, encoding="ascii")
1966 t.write("abc")
1967 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001968 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001969 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970
1971 def test_override_destructor(self):
1972 record = []
1973 class MyTextIO(self.TextIOWrapper):
1974 def __del__(self):
1975 record.append(1)
1976 try:
1977 f = super().__del__
1978 except AttributeError:
1979 pass
1980 else:
1981 f()
1982 def close(self):
1983 record.append(2)
1984 super().close()
1985 def flush(self):
1986 record.append(3)
1987 super().flush()
1988 b = self.BytesIO()
1989 t = MyTextIO(b, encoding="ascii")
1990 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001991 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 self.assertEqual(record, [1, 2, 3])
1993
1994 def test_error_through_destructor(self):
1995 # Test that the exception state is not modified by a destructor,
1996 # even if close() fails.
1997 rawio = self.CloseFailureIO()
1998 def f():
1999 self.TextIOWrapper(rawio).xyzzy
2000 with support.captured_output("stderr") as s:
2001 self.assertRaises(AttributeError, f)
2002 s = s.getvalue().strip()
2003 if s:
2004 # The destructor *may* have printed an unraisable error, check it
2005 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002006 self.assertTrue(s.startswith("Exception IOError: "), s)
2007 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002008
Guido van Rossum9b76da62007-04-11 01:09:03 +00002009 # Systematic tests of the text I/O API
2010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002012 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002013 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002015 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002017 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002019 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(f.tell(), 0)
2021 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002022 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual(f.seek(0), 0)
2024 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002025 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002026 self.assertEqual(f.read(2), "ab")
2027 self.assertEqual(f.read(1), "c")
2028 self.assertEqual(f.read(1), "")
2029 self.assertEqual(f.read(), "")
2030 self.assertEqual(f.tell(), cookie)
2031 self.assertEqual(f.seek(0), 0)
2032 self.assertEqual(f.seek(0, 2), cookie)
2033 self.assertEqual(f.write("def"), 3)
2034 self.assertEqual(f.seek(cookie), cookie)
2035 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002036 if enc.startswith("utf"):
2037 self.multi_line_test(f, enc)
2038 f.close()
2039
2040 def multi_line_test(self, f, enc):
2041 f.seek(0)
2042 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002043 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002044 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002045 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002046 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002047 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002048 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002049 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002050 wlines.append((f.tell(), line))
2051 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002052 f.seek(0)
2053 rlines = []
2054 while True:
2055 pos = f.tell()
2056 line = f.readline()
2057 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002058 break
2059 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002060 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002063 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002064 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002065 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002066 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002067 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002068 p2 = f.tell()
2069 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(f.tell(), p0)
2071 self.assertEqual(f.readline(), "\xff\n")
2072 self.assertEqual(f.tell(), p1)
2073 self.assertEqual(f.readline(), "\xff\n")
2074 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002075 f.seek(0)
2076 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002077 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002078 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002080 f.close()
2081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002082 def test_seeking(self):
2083 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002084 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002085 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002086 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002087 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002088 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002089 suffix = bytes(u_suffix.encode("utf-8"))
2090 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002091 with self.open(support.TESTFN, "wb") as f:
2092 f.write(line*2)
2093 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2094 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002095 self.assertEqual(s, str(prefix, "ascii"))
2096 self.assertEqual(f.tell(), prefix_size)
2097 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002100 # Regression test for a specific bug
2101 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002102 with self.open(support.TESTFN, "wb") as f:
2103 f.write(data)
2104 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2105 f._CHUNK_SIZE # Just test that it exists
2106 f._CHUNK_SIZE = 2
2107 f.readline()
2108 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110 def test_seek_and_tell(self):
2111 #Test seek/tell using the StatefulIncrementalDecoder.
2112 # Make test faster by doing smaller seeks
2113 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002114
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002115 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002116 """Tell/seek to various points within a data stream and ensure
2117 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002119 f.write(data)
2120 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 f = self.open(support.TESTFN, encoding='test_decoder')
2122 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002123 decoded = f.read()
2124 f.close()
2125
Neal Norwitze2b07052008-03-18 19:52:05 +00002126 for i in range(min_pos, len(decoded) + 1): # seek positions
2127 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002129 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002130 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002132 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002133 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002134 f.close()
2135
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002136 # Enable the test decoder.
2137 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002138
2139 # Run the tests.
2140 try:
2141 # Try each test case.
2142 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002143 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002144
2145 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002146 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2147 offset = CHUNK_SIZE - len(input)//2
2148 prefix = b'.'*offset
2149 # Don't bother seeking into the prefix (takes too long).
2150 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002151 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002152
2153 # Ensure our test decoder won't interfere with subsequent tests.
2154 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002155 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002158 data = "1234567890"
2159 tests = ("utf-16",
2160 "utf-16-le",
2161 "utf-16-be",
2162 "utf-32",
2163 "utf-32-le",
2164 "utf-32-be")
2165 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 buf = self.BytesIO()
2167 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002168 # Check if the BOM is written only once (see issue1753).
2169 f.write(data)
2170 f.write(data)
2171 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002173 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002174 self.assertEqual(f.read(), data * 2)
2175 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002176
Benjamin Petersona1b49012009-03-31 23:11:32 +00002177 def test_unreadable(self):
2178 class UnReadable(self.BytesIO):
2179 def readable(self):
2180 return False
2181 txt = self.TextIOWrapper(UnReadable())
2182 self.assertRaises(IOError, txt.read)
2183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 def test_read_one_by_one(self):
2185 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002186 reads = ""
2187 while True:
2188 c = txt.read(1)
2189 if not c:
2190 break
2191 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002193
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002194 def test_readlines(self):
2195 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2196 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2197 txt.seek(0)
2198 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2199 txt.seek(0)
2200 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2201
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002202 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002204 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002206 reads = ""
2207 while True:
2208 c = txt.read(128)
2209 if not c:
2210 break
2211 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002213
2214 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002216
2217 # read one char at a time
2218 reads = ""
2219 while True:
2220 c = txt.read(1)
2221 if not c:
2222 break
2223 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002225
2226 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002228 txt._CHUNK_SIZE = 4
2229
2230 reads = ""
2231 while True:
2232 c = txt.read(4)
2233 if not c:
2234 break
2235 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002237
2238 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002240 txt._CHUNK_SIZE = 4
2241
2242 reads = txt.read(4)
2243 reads += txt.read(4)
2244 reads += txt.readline()
2245 reads += txt.readline()
2246 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002248
2249 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002250 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002251 txt._CHUNK_SIZE = 4
2252
2253 reads = txt.read(4)
2254 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002255 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256
2257 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002259 txt._CHUNK_SIZE = 4
2260
2261 reads = txt.read(4)
2262 pos = txt.tell()
2263 txt.seek(0)
2264 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002266
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002267 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 buffer = self.BytesIO(self.testdata)
2269 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002270
2271 self.assertEqual(buffer.seekable(), txt.seekable())
2272
Antoine Pitroue4501852009-05-14 18:55:55 +00002273 def test_append_bom(self):
2274 # The BOM is not written again when appending to a non-empty file
2275 filename = support.TESTFN
2276 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2277 with self.open(filename, 'w', encoding=charset) as f:
2278 f.write('aaa')
2279 pos = f.tell()
2280 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002281 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002282
2283 with self.open(filename, 'a', encoding=charset) as f:
2284 f.write('xxx')
2285 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002287
2288 def test_seek_bom(self):
2289 # Same test, but when seeking manually
2290 filename = support.TESTFN
2291 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2292 with self.open(filename, 'w', encoding=charset) as f:
2293 f.write('aaa')
2294 pos = f.tell()
2295 with self.open(filename, 'r+', encoding=charset) as f:
2296 f.seek(pos)
2297 f.write('zzz')
2298 f.seek(0)
2299 f.write('bbb')
2300 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002302
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002303 def test_errors_property(self):
2304 with self.open(support.TESTFN, "w") as f:
2305 self.assertEqual(f.errors, "strict")
2306 with self.open(support.TESTFN, "w", errors="replace") as f:
2307 self.assertEqual(f.errors, "replace")
2308
Brett Cannon31f59292011-02-21 19:29:56 +00002309 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002310 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002311 def test_threads_write(self):
2312 # Issue6750: concurrent writes could duplicate data
2313 event = threading.Event()
2314 with self.open(support.TESTFN, "w", buffering=1) as f:
2315 def run(n):
2316 text = "Thread%03d\n" % n
2317 event.wait()
2318 f.write(text)
2319 threads = [threading.Thread(target=lambda n=x: run(n))
2320 for x in range(20)]
2321 for t in threads:
2322 t.start()
2323 time.sleep(0.02)
2324 event.set()
2325 for t in threads:
2326 t.join()
2327 with self.open(support.TESTFN) as f:
2328 content = f.read()
2329 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002331
Antoine Pitrou6be88762010-05-03 16:48:20 +00002332 def test_flush_error_on_close(self):
2333 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2334 def bad_flush():
2335 raise IOError()
2336 txt.flush = bad_flush
2337 self.assertRaises(IOError, txt.close) # exception not swallowed
2338
2339 def test_multi_close(self):
2340 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2341 txt.close()
2342 txt.close()
2343 txt.close()
2344 self.assertRaises(ValueError, txt.flush)
2345
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002346 def test_unseekable(self):
2347 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2348 self.assertRaises(self.UnsupportedOperation, txt.tell)
2349 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2350
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002351 def test_readonly_attributes(self):
2352 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2353 buf = self.BytesIO(self.testdata)
2354 with self.assertRaises(AttributeError):
2355 txt.buffer = buf
2356
Antoine Pitroue96ec682011-07-23 21:46:35 +02002357 def test_rawio(self):
2358 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2359 # that subprocess.Popen() can have the required unbuffered
2360 # semantics with universal_newlines=True.
2361 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2362 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2363 # Reads
2364 self.assertEqual(txt.read(4), 'abcd')
2365 self.assertEqual(txt.readline(), 'efghi\n')
2366 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2367
2368 def test_rawio_write_through(self):
2369 # Issue #12591: with write_through=True, writes don't need a flush
2370 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2371 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2372 write_through=True)
2373 txt.write('1')
2374 txt.write('23\n4')
2375 txt.write('5')
2376 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378class CTextIOWrapperTest(TextIOWrapperTest):
2379
2380 def test_initialization(self):
2381 r = self.BytesIO(b"\xc3\xa9\n\n")
2382 b = self.BufferedReader(r, 1000)
2383 t = self.TextIOWrapper(b)
2384 self.assertRaises(TypeError, t.__init__, b, newline=42)
2385 self.assertRaises(ValueError, t.read)
2386 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2387 self.assertRaises(ValueError, t.read)
2388
2389 def test_garbage_collection(self):
2390 # C TextIOWrapper objects are collected, and collecting them flushes
2391 # all data to disk.
2392 # The Python version has __del__, so it ends in gc.garbage instead.
2393 rawio = io.FileIO(support.TESTFN, "wb")
2394 b = self.BufferedWriter(rawio)
2395 t = self.TextIOWrapper(b, encoding="ascii")
2396 t.write("456def")
2397 t.x = t
2398 wr = weakref.ref(t)
2399 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002400 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002401 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002402 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002403 self.assertEqual(f.read(), b"456def")
2404
2405class PyTextIOWrapperTest(TextIOWrapperTest):
2406 pass
2407
2408
2409class IncrementalNewlineDecoderTest(unittest.TestCase):
2410
2411 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002412 # UTF-8 specific tests for a newline decoder
2413 def _check_decode(b, s, **kwargs):
2414 # We exercise getstate() / setstate() as well as decode()
2415 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002417 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002418 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002419
Antoine Pitrou180a3362008-12-14 16:36:46 +00002420 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002421
Antoine Pitrou180a3362008-12-14 16:36:46 +00002422 _check_decode(b'\xe8', "")
2423 _check_decode(b'\xa2', "")
2424 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425
Antoine Pitrou180a3362008-12-14 16:36:46 +00002426 _check_decode(b'\xe8', "")
2427 _check_decode(b'\xa2', "")
2428 _check_decode(b'\x88', "\u8888")
2429
2430 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2432
Antoine Pitrou180a3362008-12-14 16:36:46 +00002433 decoder.reset()
2434 _check_decode(b'\n', "\n")
2435 _check_decode(b'\r', "")
2436 _check_decode(b'', "\n", final=True)
2437 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002438
Antoine Pitrou180a3362008-12-14 16:36:46 +00002439 _check_decode(b'\r', "")
2440 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002441
Antoine Pitrou180a3362008-12-14 16:36:46 +00002442 _check_decode(b'\r\r\n', "\n\n")
2443 _check_decode(b'\r', "")
2444 _check_decode(b'\r', "\n")
2445 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002446
Antoine Pitrou180a3362008-12-14 16:36:46 +00002447 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2448 _check_decode(b'\xe8\xa2\x88', "\u8888")
2449 _check_decode(b'\n', "\n")
2450 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2451 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002452
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002453 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002454 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002455 if encoding is not None:
2456 encoder = codecs.getincrementalencoder(encoding)()
2457 def _decode_bytewise(s):
2458 # Decode one byte at a time
2459 for b in encoder.encode(s):
2460 result.append(decoder.decode(bytes([b])))
2461 else:
2462 encoder = None
2463 def _decode_bytewise(s):
2464 # Decode one char at a time
2465 for c in s:
2466 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002468 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002469 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002470 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002471 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002472 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002473 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002474 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002476 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002477 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002479 input = "abc"
2480 if encoder is not None:
2481 encoder.reset()
2482 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002483 self.assertEqual(decoder.decode(input), "abc")
2484 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002485
2486 def test_newline_decoder(self):
2487 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002488 # None meaning the IncrementalNewlineDecoder takes unicode input
2489 # rather than bytes input
2490 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002491 'utf-16', 'utf-16-le', 'utf-16-be',
2492 'utf-32', 'utf-32-le', 'utf-32-be',
2493 )
2494 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002495 decoder = enc and codecs.getincrementaldecoder(enc)()
2496 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2497 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002498 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002499 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2500 self.check_newline_decoding_utf8(decoder)
2501
Antoine Pitrou66913e22009-03-06 23:40:56 +00002502 def test_newline_bytes(self):
2503 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2504 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002505 self.assertEqual(dec.newlines, None)
2506 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2507 self.assertEqual(dec.newlines, None)
2508 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2509 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002510 dec = self.IncrementalNewlineDecoder(None, translate=False)
2511 _check(dec)
2512 dec = self.IncrementalNewlineDecoder(None, translate=True)
2513 _check(dec)
2514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2516 pass
2517
2518class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2519 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002520
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002521
Guido van Rossum01a27522007-03-07 01:00:12 +00002522# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002523
Guido van Rossum5abbf752007-08-27 17:39:33 +00002524class MiscIOTest(unittest.TestCase):
2525
Barry Warsaw40e82462008-11-20 20:14:50 +00002526 def tearDown(self):
2527 support.unlink(support.TESTFN)
2528
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 def test___all__(self):
2530 for name in self.io.__all__:
2531 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002532 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002533 if name == "open":
2534 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002535 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002536 self.assertTrue(issubclass(obj, Exception), name)
2537 elif not name.startswith("SEEK_"):
2538 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002539
Barry Warsaw40e82462008-11-20 20:14:50 +00002540 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002543 f.close()
2544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002546 self.assertEqual(f.name, support.TESTFN)
2547 self.assertEqual(f.buffer.name, support.TESTFN)
2548 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2549 self.assertEqual(f.mode, "U")
2550 self.assertEqual(f.buffer.mode, "rb")
2551 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002552 f.close()
2553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002554 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002555 self.assertEqual(f.mode, "w+")
2556 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2557 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002560 self.assertEqual(g.mode, "wb")
2561 self.assertEqual(g.raw.mode, "wb")
2562 self.assertEqual(g.name, f.fileno())
2563 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002564 f.close()
2565 g.close()
2566
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002567 def test_io_after_close(self):
2568 for kwargs in [
2569 {"mode": "w"},
2570 {"mode": "wb"},
2571 {"mode": "w", "buffering": 1},
2572 {"mode": "w", "buffering": 2},
2573 {"mode": "wb", "buffering": 0},
2574 {"mode": "r"},
2575 {"mode": "rb"},
2576 {"mode": "r", "buffering": 1},
2577 {"mode": "r", "buffering": 2},
2578 {"mode": "rb", "buffering": 0},
2579 {"mode": "w+"},
2580 {"mode": "w+b"},
2581 {"mode": "w+", "buffering": 1},
2582 {"mode": "w+", "buffering": 2},
2583 {"mode": "w+b", "buffering": 0},
2584 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002586 f.close()
2587 self.assertRaises(ValueError, f.flush)
2588 self.assertRaises(ValueError, f.fileno)
2589 self.assertRaises(ValueError, f.isatty)
2590 self.assertRaises(ValueError, f.__iter__)
2591 if hasattr(f, "peek"):
2592 self.assertRaises(ValueError, f.peek, 1)
2593 self.assertRaises(ValueError, f.read)
2594 if hasattr(f, "read1"):
2595 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002596 if hasattr(f, "readall"):
2597 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002598 if hasattr(f, "readinto"):
2599 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2600 self.assertRaises(ValueError, f.readline)
2601 self.assertRaises(ValueError, f.readlines)
2602 self.assertRaises(ValueError, f.seek, 0)
2603 self.assertRaises(ValueError, f.tell)
2604 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002605 self.assertRaises(ValueError, f.write,
2606 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002607 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002610 def test_blockingioerror(self):
2611 # Various BlockingIOError issues
2612 self.assertRaises(TypeError, self.BlockingIOError)
2613 self.assertRaises(TypeError, self.BlockingIOError, 1)
2614 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2615 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2616 b = self.BlockingIOError(1, "")
2617 self.assertEqual(b.characters_written, 0)
2618 class C(str):
2619 pass
2620 c = C("")
2621 b = self.BlockingIOError(1, c)
2622 c.b = b
2623 b.c = c
2624 wr = weakref.ref(c)
2625 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002626 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002627 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628
2629 def test_abcs(self):
2630 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002631 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2632 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2633 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2634 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635
2636 def _check_abc_inheritance(self, abcmodule):
2637 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002638 self.assertIsInstance(f, abcmodule.IOBase)
2639 self.assertIsInstance(f, abcmodule.RawIOBase)
2640 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2641 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002643 self.assertIsInstance(f, abcmodule.IOBase)
2644 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2645 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2646 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002648 self.assertIsInstance(f, abcmodule.IOBase)
2649 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2650 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2651 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652
2653 def test_abc_inheritance(self):
2654 # Test implementations inherit from their respective ABCs
2655 self._check_abc_inheritance(self)
2656
2657 def test_abc_inheritance_official(self):
2658 # Test implementations inherit from the official ABCs of the
2659 # baseline "io" module.
2660 self._check_abc_inheritance(io)
2661
Antoine Pitroue033e062010-10-29 10:38:18 +00002662 def _check_warn_on_dealloc(self, *args, **kwargs):
2663 f = open(*args, **kwargs)
2664 r = repr(f)
2665 with self.assertWarns(ResourceWarning) as cm:
2666 f = None
2667 support.gc_collect()
2668 self.assertIn(r, str(cm.warning.args[0]))
2669
2670 def test_warn_on_dealloc(self):
2671 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2672 self._check_warn_on_dealloc(support.TESTFN, "wb")
2673 self._check_warn_on_dealloc(support.TESTFN, "w")
2674
2675 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2676 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002677 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002678 for fd in fds:
2679 try:
2680 os.close(fd)
2681 except EnvironmentError as e:
2682 if e.errno != errno.EBADF:
2683 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002684 self.addCleanup(cleanup_fds)
2685 r, w = os.pipe()
2686 fds += r, w
2687 self._check_warn_on_dealloc(r, *args, **kwargs)
2688 # When using closefd=False, there's no warning
2689 r, w = os.pipe()
2690 fds += r, w
2691 with warnings.catch_warnings(record=True) as recorded:
2692 open(r, *args, closefd=False, **kwargs)
2693 support.gc_collect()
2694 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002695
2696 def test_warn_on_dealloc_fd(self):
2697 self._check_warn_on_dealloc_fd("rb", buffering=0)
2698 self._check_warn_on_dealloc_fd("rb")
2699 self._check_warn_on_dealloc_fd("r")
2700
2701
Antoine Pitrou243757e2010-11-05 21:15:39 +00002702 def test_pickling(self):
2703 # Pickling file objects is forbidden
2704 for kwargs in [
2705 {"mode": "w"},
2706 {"mode": "wb"},
2707 {"mode": "wb", "buffering": 0},
2708 {"mode": "r"},
2709 {"mode": "rb"},
2710 {"mode": "rb", "buffering": 0},
2711 {"mode": "w+"},
2712 {"mode": "w+b"},
2713 {"mode": "w+b", "buffering": 0},
2714 ]:
2715 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2716 with self.open(support.TESTFN, **kwargs) as f:
2717 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719class CMiscIOTest(MiscIOTest):
2720 io = io
2721
2722class PyMiscIOTest(MiscIOTest):
2723 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002724
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002725
2726@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2727class SignalsTest(unittest.TestCase):
2728
2729 def setUp(self):
2730 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2731
2732 def tearDown(self):
2733 signal.signal(signal.SIGALRM, self.oldalrm)
2734
2735 def alarm_interrupt(self, sig, frame):
2736 1/0
2737
2738 @unittest.skipUnless(threading, 'Threading required for this test.')
2739 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2740 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002741 invokes the signal handler, and bubbles up the exception raised
2742 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002743 read_results = []
2744 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002745 if hasattr(signal, 'pthread_sigmask'):
2746 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002747 s = os.read(r, 1)
2748 read_results.append(s)
2749 t = threading.Thread(target=_read)
2750 t.daemon = True
2751 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002752 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002753 try:
2754 wio = self.io.open(w, **fdopen_kwargs)
2755 t.start()
2756 signal.alarm(1)
2757 # Fill the pipe enough that the write will be blocking.
2758 # It will be interrupted by the timer armed above. Since the
2759 # other thread has read one byte, the low-level write will
2760 # return with a successful (partial) result rather than an EINTR.
2761 # The buffered IO layer must check for pending signal
2762 # handlers, which in this case will invoke alarm_interrupt().
2763 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002764 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002765 t.join()
2766 # We got one byte, get another one and check that it isn't a
2767 # repeat of the first one.
2768 read_results.append(os.read(r, 1))
2769 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2770 finally:
2771 os.close(w)
2772 os.close(r)
2773 # This is deliberate. If we didn't close the file descriptor
2774 # before closing wio, wio would try to flush its internal
2775 # buffer, and block again.
2776 try:
2777 wio.close()
2778 except IOError as e:
2779 if e.errno != errno.EBADF:
2780 raise
2781
2782 def test_interrupted_write_unbuffered(self):
2783 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2784
2785 def test_interrupted_write_buffered(self):
2786 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2787
2788 def test_interrupted_write_text(self):
2789 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2790
Brett Cannon31f59292011-02-21 19:29:56 +00002791 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002792 def check_reentrant_write(self, data, **fdopen_kwargs):
2793 def on_alarm(*args):
2794 # Will be called reentrantly from the same thread
2795 wio.write(data)
2796 1/0
2797 signal.signal(signal.SIGALRM, on_alarm)
2798 r, w = os.pipe()
2799 wio = self.io.open(w, **fdopen_kwargs)
2800 try:
2801 signal.alarm(1)
2802 # Either the reentrant call to wio.write() fails with RuntimeError,
2803 # or the signal handler raises ZeroDivisionError.
2804 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2805 while 1:
2806 for i in range(100):
2807 wio.write(data)
2808 wio.flush()
2809 # Make sure the buffer doesn't fill up and block further writes
2810 os.read(r, len(data) * 100)
2811 exc = cm.exception
2812 if isinstance(exc, RuntimeError):
2813 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2814 finally:
2815 wio.close()
2816 os.close(r)
2817
2818 def test_reentrant_write_buffered(self):
2819 self.check_reentrant_write(b"xy", mode="wb")
2820
2821 def test_reentrant_write_text(self):
2822 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2823
Antoine Pitrou707ce822011-02-25 21:24:11 +00002824 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2825 """Check that a buffered read, when it gets interrupted (either
2826 returning a partial result or EINTR), properly invokes the signal
2827 handler and retries if the latter returned successfully."""
2828 r, w = os.pipe()
2829 fdopen_kwargs["closefd"] = False
2830 def alarm_handler(sig, frame):
2831 os.write(w, b"bar")
2832 signal.signal(signal.SIGALRM, alarm_handler)
2833 try:
2834 rio = self.io.open(r, **fdopen_kwargs)
2835 os.write(w, b"foo")
2836 signal.alarm(1)
2837 # Expected behaviour:
2838 # - first raw read() returns partial b"foo"
2839 # - second raw read() returns EINTR
2840 # - third raw read() returns b"bar"
2841 self.assertEqual(decode(rio.read(6)), "foobar")
2842 finally:
2843 rio.close()
2844 os.close(w)
2845 os.close(r)
2846
Antoine Pitrou20db5112011-08-19 20:32:34 +02002847 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002848 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2849 mode="rb")
2850
Antoine Pitrou20db5112011-08-19 20:32:34 +02002851 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002852 self.check_interrupted_read_retry(lambda x: x,
2853 mode="r")
2854
2855 @unittest.skipUnless(threading, 'Threading required for this test.')
2856 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2857 """Check that a buffered write, when it gets interrupted (either
2858 returning a partial result or EINTR), properly invokes the signal
2859 handler and retries if the latter returned successfully."""
2860 select = support.import_module("select")
2861 # A quantity that exceeds the buffer size of an anonymous pipe's
2862 # write end.
2863 N = 1024 * 1024
2864 r, w = os.pipe()
2865 fdopen_kwargs["closefd"] = False
2866 # We need a separate thread to read from the pipe and allow the
2867 # write() to finish. This thread is started after the SIGALRM is
2868 # received (forcing a first EINTR in write()).
2869 read_results = []
2870 write_finished = False
2871 def _read():
2872 while not write_finished:
2873 while r in select.select([r], [], [], 1.0)[0]:
2874 s = os.read(r, 1024)
2875 read_results.append(s)
2876 t = threading.Thread(target=_read)
2877 t.daemon = True
2878 def alarm1(sig, frame):
2879 signal.signal(signal.SIGALRM, alarm2)
2880 signal.alarm(1)
2881 def alarm2(sig, frame):
2882 t.start()
2883 signal.signal(signal.SIGALRM, alarm1)
2884 try:
2885 wio = self.io.open(w, **fdopen_kwargs)
2886 signal.alarm(1)
2887 # Expected behaviour:
2888 # - first raw write() is partial (because of the limited pipe buffer
2889 # and the first alarm)
2890 # - second raw write() returns EINTR (because of the second alarm)
2891 # - subsequent write()s are successful (either partial or complete)
2892 self.assertEqual(N, wio.write(item * N))
2893 wio.flush()
2894 write_finished = True
2895 t.join()
2896 self.assertEqual(N, sum(len(x) for x in read_results))
2897 finally:
2898 write_finished = True
2899 os.close(w)
2900 os.close(r)
2901 # This is deliberate. If we didn't close the file descriptor
2902 # before closing wio, wio would try to flush its internal
2903 # buffer, and could block (in case of failure).
2904 try:
2905 wio.close()
2906 except IOError as e:
2907 if e.errno != errno.EBADF:
2908 raise
2909
Antoine Pitrou20db5112011-08-19 20:32:34 +02002910 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002911 self.check_interrupted_write_retry(b"x", mode="wb")
2912
Antoine Pitrou20db5112011-08-19 20:32:34 +02002913 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002914 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2915
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002916
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002917class CSignalsTest(SignalsTest):
2918 io = io
2919
2920class PySignalsTest(SignalsTest):
2921 io = pyio
2922
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002923 # Handling reentrancy issues would slow down _pyio even more, so the
2924 # tests are disabled.
2925 test_reentrant_write_buffered = None
2926 test_reentrant_write_text = None
2927
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002928
Guido van Rossum28524c72007-02-27 05:47:44 +00002929def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002930 tests = (CIOTest, PyIOTest,
2931 CBufferedReaderTest, PyBufferedReaderTest,
2932 CBufferedWriterTest, PyBufferedWriterTest,
2933 CBufferedRWPairTest, PyBufferedRWPairTest,
2934 CBufferedRandomTest, PyBufferedRandomTest,
2935 StatefulIncrementalDecoderTest,
2936 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2937 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002938 CMiscIOTest, PyMiscIOTest,
2939 CSignalsTest, PySignalsTest,
2940 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941
2942 # Put the namespaces of the IO module we are testing and some useful mock
2943 # classes in the __dict__ of each test.
2944 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002945 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002946 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2947 c_io_ns = {name : getattr(io, name) for name in all_members}
2948 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2949 globs = globals()
2950 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2951 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2952 # Avoid turning open into a bound method.
2953 py_io_ns["open"] = pyio.OpenWrapper
2954 for test in tests:
2955 if test.__name__.startswith("C"):
2956 for name, obj in c_io_ns.items():
2957 setattr(test, name, obj)
2958 elif test.__name__.startswith("Py"):
2959 for name, obj in py_io_ns.items():
2960 setattr(test, name, obj)
2961
2962 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002963
2964if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965 test_main()