blob: 2c502ab5556250ba8e719e17f28000f1e6e4a0a9 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400613 def test_types_have_dict(self):
614 test = (
615 self.IOBase(),
616 self.RawIOBase(),
617 self.TextIOBase(),
618 self.StringIO(),
619 self.BytesIO()
620 )
621 for obj in test:
622 self.assertTrue(hasattr(obj, "__dict__"))
623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200625
626 def test_IOBase_finalize(self):
627 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
628 # class which inherits IOBase and an object of this class are caught
629 # in a reference cycle and close() is already in the method cache.
630 class MyIO(self.IOBase):
631 def close(self):
632 pass
633
634 # create an instance to populate the method cache
635 MyIO()
636 obj = MyIO()
637 obj.obj = obj
638 wr = weakref.ref(obj)
639 del MyIO
640 del obj
641 support.gc_collect()
642 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000644class PyIOTest(IOTest):
645 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000646
Guido van Rossuma9e20242007-03-08 00:43:48 +0000647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662
663 def test_no_fileno(self):
664 # XXX will we always have fileno() function? If so, kill
665 # this test. Else, write it.
666 pass
667
668 def test_invalid_args(self):
669 rawio = self.MockRawIO()
670 bufio = self.tp(rawio)
671 # Invalid whence
672 self.assertRaises(ValueError, bufio.seek, 0, -1)
673 self.assertRaises(ValueError, bufio.seek, 0, 3)
674
675 def test_override_destructor(self):
676 tp = self.tp
677 record = []
678 class MyBufferedIO(tp):
679 def __del__(self):
680 record.append(1)
681 try:
682 f = super().__del__
683 except AttributeError:
684 pass
685 else:
686 f()
687 def close(self):
688 record.append(2)
689 super().close()
690 def flush(self):
691 record.append(3)
692 super().flush()
693 rawio = self.MockRawIO()
694 bufio = MyBufferedIO(rawio)
695 writable = bufio.writable()
696 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 if writable:
699 self.assertEqual(record, [1, 2, 3])
700 else:
701 self.assertEqual(record, [1, 2])
702
703 def test_context_manager(self):
704 # Test usability as a context manager
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 def _with():
708 with bufio:
709 pass
710 _with()
711 # bufio should now be closed, and using it a second time should raise
712 # a ValueError.
713 self.assertRaises(ValueError, _with)
714
715 def test_error_through_destructor(self):
716 # Test that the exception state is not modified by a destructor,
717 # even if close() fails.
718 rawio = self.CloseFailureIO()
719 def f():
720 self.tp(rawio).xyzzy
721 with support.captured_output("stderr") as s:
722 self.assertRaises(AttributeError, f)
723 s = s.getvalue().strip()
724 if s:
725 # The destructor *may* have printed an unraisable error, check it
726 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000727 self.assertTrue(s.startswith("Exception IOError: "), s)
728 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Antoine Pitrou716c4442009-05-23 19:04:03 +0000730 def test_repr(self):
731 raw = self.MockRawIO()
732 b = self.tp(raw)
733 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
734 self.assertEqual(repr(b), "<%s>" % clsname)
735 raw.name = "dummy"
736 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
737 raw.name = b"dummy"
738 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
739
Antoine Pitrou6be88762010-05-03 16:48:20 +0000740 def test_flush_error_on_close(self):
741 raw = self.MockRawIO()
742 def bad_flush():
743 raise IOError()
744 raw.flush = bad_flush
745 b = self.tp(raw)
746 self.assertRaises(IOError, b.close) # exception not swallowed
747
748 def test_multi_close(self):
749 raw = self.MockRawIO()
750 b = self.tp(raw)
751 b.close()
752 b.close()
753 b.close()
754 self.assertRaises(ValueError, b.flush)
755
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000756 def test_unseekable(self):
757 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
758 self.assertRaises(self.UnsupportedOperation, bufio.tell)
759 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
760
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises(AttributeError):
766 buf.raw = x
767
Guido van Rossum78892e42007-04-06 17:31:18 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
770 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000772 def test_constructor(self):
773 rawio = self.MockRawIO([b"abc"])
774 bufio = self.tp(rawio)
775 bufio.__init__(rawio)
776 bufio.__init__(rawio, buffer_size=1024)
777 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
780 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
781 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
782 rawio = self.MockRawIO([b"abc"])
783 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000784 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 for arg in (None, 7):
788 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
789 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000790 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 # Invalid args
792 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read1(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000797 self.assertEqual(b"a", bufio.read(1))
798 self.assertEqual(b"b", bufio.read1(1))
799 self.assertEqual(rawio._reads, 1)
800 self.assertEqual(b"c", bufio.read1(100))
801 self.assertEqual(rawio._reads, 1)
802 self.assertEqual(b"d", bufio.read1(100))
803 self.assertEqual(rawio._reads, 2)
804 self.assertEqual(b"efg", bufio.read1(100))
805 self.assertEqual(rawio._reads, 3)
806 self.assertEqual(b"", bufio.read1(100))
807 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 # Invalid args
809 self.assertRaises(ValueError, bufio.read1, -1)
810
811 def test_readinto(self):
812 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
813 bufio = self.tp(rawio)
814 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000815 self.assertEqual(bufio.readinto(b), 2)
816 self.assertEqual(b, b"ab")
817 self.assertEqual(bufio.readinto(b), 2)
818 self.assertEqual(b, b"cd")
819 self.assertEqual(bufio.readinto(b), 2)
820 self.assertEqual(b, b"ef")
821 self.assertEqual(bufio.readinto(b), 1)
822 self.assertEqual(b, b"gf")
823 self.assertEqual(bufio.readinto(b), 0)
824 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200825 rawio = self.MockRawIO((b"abc", None))
826 bufio = self.tp(rawio)
827 self.assertEqual(bufio.readinto(b), 2)
828 self.assertEqual(b, b"ab")
829 self.assertEqual(bufio.readinto(b), 1)
830 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000831
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000832 def test_readlines(self):
833 def bufio():
834 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
835 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000836 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
837 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
838 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000841 data = b"abcdefghi"
842 dlen = len(data)
843
844 tests = [
845 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
846 [ 100, [ 3, 3, 3], [ dlen ] ],
847 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
848 ]
849
850 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 rawio = self.MockFileIO(data)
852 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000853 pos = 0
854 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000855 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000856 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000857 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000861 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
863 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000864 self.assertEqual(b"abcd", bufio.read(6))
865 self.assertEqual(b"e", bufio.read(1))
866 self.assertEqual(b"fg", bufio.read())
867 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200868 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000870
Victor Stinnera80987f2011-05-25 22:47:16 +0200871 rawio = self.MockRawIO((b"a", None, None))
872 self.assertEqual(b"a", rawio.readall())
873 self.assertIsNone(rawio.readall())
874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_read_past_eof(self):
876 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
877 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000880
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 def test_read_all(self):
882 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
883 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000884
Ezio Melottib3aedd42010-11-20 19:04:17 +0000885 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000886
Victor Stinner45df8202010-04-28 22:31:17 +0000887 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000888 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000890 try:
891 # Write out many bytes with exactly the same number of 0's,
892 # 1's... 255's. This will help us check that concurrent reading
893 # doesn't duplicate or forget contents.
894 N = 1000
895 l = list(range(256)) * N
896 random.shuffle(l)
897 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000898 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000899 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000900 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000902 errors = []
903 results = []
904 def f():
905 try:
906 # Intra-buffer read then buffer-flushing read
907 for n in cycle([1, 19]):
908 s = bufio.read(n)
909 if not s:
910 break
911 # list.append() is atomic
912 results.append(s)
913 except Exception as e:
914 errors.append(e)
915 raise
916 threads = [threading.Thread(target=f) for x in range(20)]
917 for t in threads:
918 t.start()
919 time.sleep(0.02) # yield
920 for t in threads:
921 t.join()
922 self.assertFalse(errors,
923 "the following exceptions were caught: %r" % errors)
924 s = b''.join(results)
925 for i in range(256):
926 c = bytes(bytearray([i]))
927 self.assertEqual(s.count(c), N)
928 finally:
929 support.unlink(support.TESTFN)
930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 def test_misbehaved_io(self):
932 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
933 bufio = self.tp(rawio)
934 self.assertRaises(IOError, bufio.seek, 0)
935 self.assertRaises(IOError, bufio.tell)
936
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000937 def test_no_extraneous_read(self):
938 # Issue #9550; when the raw IO object has satisfied the read request,
939 # we should not issue any additional reads, otherwise it may block
940 # (e.g. socket).
941 bufsize = 16
942 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
943 rawio = self.MockRawIO([b"x" * n])
944 bufio = self.tp(rawio, bufsize)
945 self.assertEqual(bufio.read(n), b"x" * n)
946 # Simple case: one raw read is enough to satisfy the request.
947 self.assertEqual(rawio._extraneous_reads, 0,
948 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
949 # A more complex case where two raw reads are needed to satisfy
950 # the request.
951 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
952 bufio = self.tp(rawio, bufsize)
953 self.assertEqual(bufio.read(n), b"x" * n)
954 self.assertEqual(rawio._extraneous_reads, 0,
955 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
956
957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958class CBufferedReaderTest(BufferedReaderTest):
959 tp = io.BufferedReader
960
961 def test_constructor(self):
962 BufferedReaderTest.test_constructor(self)
963 # The allocation can succeed on 32-bit builds, e.g. with more
964 # than 2GB RAM and a 64-bit kernel.
965 if sys.maxsize > 0x7FFFFFFF:
966 rawio = self.MockRawIO()
967 bufio = self.tp(rawio)
968 self.assertRaises((OverflowError, MemoryError, ValueError),
969 bufio.__init__, rawio, sys.maxsize)
970
971 def test_initialization(self):
972 rawio = self.MockRawIO([b"abc"])
973 bufio = self.tp(rawio)
974 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
975 self.assertRaises(ValueError, bufio.read)
976 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
977 self.assertRaises(ValueError, bufio.read)
978 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
979 self.assertRaises(ValueError, bufio.read)
980
981 def test_misbehaved_io_read(self):
982 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
983 bufio = self.tp(rawio)
984 # _pyio.BufferedReader seems to implement reading different, so that
985 # checking this is not so easy.
986 self.assertRaises(IOError, bufio.read, 10)
987
988 def test_garbage_collection(self):
989 # C BufferedReader objects are collected.
990 # The Python version has __del__, so it ends into gc.garbage instead
991 rawio = self.FileIO(support.TESTFN, "w+b")
992 f = self.tp(rawio)
993 f.f = f
994 wr = weakref.ref(f)
995 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000996 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000997 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998
999class PyBufferedReaderTest(BufferedReaderTest):
1000 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001001
Guido van Rossuma9e20242007-03-08 00:43:48 +00001002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1004 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 def test_constructor(self):
1007 rawio = self.MockRawIO()
1008 bufio = self.tp(rawio)
1009 bufio.__init__(rawio)
1010 bufio.__init__(rawio, buffer_size=1024)
1011 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001012 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 bufio.flush()
1014 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1015 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1016 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1017 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001018 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001022 def test_detach_flush(self):
1023 raw = self.MockRawIO()
1024 buf = self.tp(raw)
1025 buf.write(b"howdy!")
1026 self.assertFalse(raw._write_stack)
1027 buf.detach()
1028 self.assertEqual(raw._write_stack, [b"howdy!"])
1029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001031 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 writer = self.MockRawIO()
1033 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001034 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001035 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_write_overflow(self):
1038 writer = self.MockRawIO()
1039 bufio = self.tp(writer, 8)
1040 contents = b"abcdefghijklmnop"
1041 for n in range(0, len(contents), 3):
1042 bufio.write(contents[n:n+3])
1043 flushed = b"".join(writer._write_stack)
1044 # At least (total - 8) bytes were implicitly flushed, perhaps more
1045 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001046 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def check_writes(self, intermediate_func):
1049 # Lots of writes, test the flushed output is as expected.
1050 contents = bytes(range(256)) * 1000
1051 n = 0
1052 writer = self.MockRawIO()
1053 bufio = self.tp(writer, 13)
1054 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1055 def gen_sizes():
1056 for size in count(1):
1057 for i in range(15):
1058 yield size
1059 sizes = gen_sizes()
1060 while n < len(contents):
1061 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 intermediate_func(bufio)
1064 n += size
1065 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001066 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001067
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 def test_writes(self):
1069 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001070
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 def test_writes_and_flushes(self):
1072 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 def test_writes_and_seeks(self):
1075 def _seekabs(bufio):
1076 pos = bufio.tell()
1077 bufio.seek(pos + 1, 0)
1078 bufio.seek(pos - 1, 0)
1079 bufio.seek(pos, 0)
1080 self.check_writes(_seekabs)
1081 def _seekrel(bufio):
1082 pos = bufio.seek(0, 1)
1083 bufio.seek(+1, 1)
1084 bufio.seek(-1, 1)
1085 bufio.seek(pos, 0)
1086 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 def test_writes_and_truncates(self):
1089 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 def test_write_non_blocking(self):
1092 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001093 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001094
Ezio Melottib3aedd42010-11-20 19:04:17 +00001095 self.assertEqual(bufio.write(b"abcd"), 4)
1096 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 # 1 byte will be written, the rest will be buffered
1098 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001099 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1102 raw.block_on(b"0")
1103 try:
1104 bufio.write(b"opqrwxyz0123456789")
1105 except self.BlockingIOError as e:
1106 written = e.characters_written
1107 else:
1108 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(written, 16)
1110 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001112
Ezio Melottib3aedd42010-11-20 19:04:17 +00001113 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 s = raw.pop_written()
1115 # Previously buffered bytes were flushed
1116 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_write_and_rewind(self):
1119 raw = io.BytesIO()
1120 bufio = self.tp(raw, 4)
1121 self.assertEqual(bufio.write(b"abcdef"), 6)
1122 self.assertEqual(bufio.tell(), 6)
1123 bufio.seek(0, 0)
1124 self.assertEqual(bufio.write(b"XY"), 2)
1125 bufio.seek(6, 0)
1126 self.assertEqual(raw.getvalue(), b"XYcdef")
1127 self.assertEqual(bufio.write(b"123456"), 6)
1128 bufio.flush()
1129 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def test_flush(self):
1132 writer = self.MockRawIO()
1133 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001134 bufio.write(b"abc")
1135 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001136 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 def test_destructor(self):
1139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 8)
1141 bufio.write(b"abc")
1142 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001143 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001144 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001145
1146 def test_truncate(self):
1147 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001148 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 bufio = self.tp(raw, 8)
1150 bufio.write(b"abcdef")
1151 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001152 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001153 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 self.assertEqual(f.read(), b"abc")
1155
Victor Stinner45df8202010-04-28 22:31:17 +00001156 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001157 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001159 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 # Write out many bytes from many threads and test they were
1161 # all flushed.
1162 N = 1000
1163 contents = bytes(range(256)) * N
1164 sizes = cycle([1, 19])
1165 n = 0
1166 queue = deque()
1167 while n < len(contents):
1168 size = next(sizes)
1169 queue.append(contents[n:n+size])
1170 n += size
1171 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001172 # We use a real file object because it allows us to
1173 # exercise situations where the GIL is released before
1174 # writing the buffer to the raw streams. This is in addition
1175 # to concurrency issues due to switching threads in the middle
1176 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001177 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001179 errors = []
1180 def f():
1181 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 while True:
1183 try:
1184 s = queue.popleft()
1185 except IndexError:
1186 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001187 bufio.write(s)
1188 except Exception as e:
1189 errors.append(e)
1190 raise
1191 threads = [threading.Thread(target=f) for x in range(20)]
1192 for t in threads:
1193 t.start()
1194 time.sleep(0.02) # yield
1195 for t in threads:
1196 t.join()
1197 self.assertFalse(errors,
1198 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001199 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001200 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 s = f.read()
1202 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001204 finally:
1205 support.unlink(support.TESTFN)
1206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 def test_misbehaved_io(self):
1208 rawio = self.MisbehavedRawIO()
1209 bufio = self.tp(rawio, 5)
1210 self.assertRaises(IOError, bufio.seek, 0)
1211 self.assertRaises(IOError, bufio.tell)
1212 self.assertRaises(IOError, bufio.write, b"abcdef")
1213
Benjamin Peterson59406a92009-03-26 17:10:29 +00001214 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001215 with support.check_warnings(("max_buffer_size is deprecated",
1216 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001217 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001218
1219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220class CBufferedWriterTest(BufferedWriterTest):
1221 tp = io.BufferedWriter
1222
1223 def test_constructor(self):
1224 BufferedWriterTest.test_constructor(self)
1225 # The allocation can succeed on 32-bit builds, e.g. with more
1226 # than 2GB RAM and a 64-bit kernel.
1227 if sys.maxsize > 0x7FFFFFFF:
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio)
1230 self.assertRaises((OverflowError, MemoryError, ValueError),
1231 bufio.__init__, rawio, sys.maxsize)
1232
1233 def test_initialization(self):
1234 rawio = self.MockRawIO()
1235 bufio = self.tp(rawio)
1236 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1237 self.assertRaises(ValueError, bufio.write, b"def")
1238 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1239 self.assertRaises(ValueError, bufio.write, b"def")
1240 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1241 self.assertRaises(ValueError, bufio.write, b"def")
1242
1243 def test_garbage_collection(self):
1244 # C BufferedWriter objects are collected, and collecting them flushes
1245 # all data to disk.
1246 # The Python version has __del__, so it ends into gc.garbage instead
1247 rawio = self.FileIO(support.TESTFN, "w+b")
1248 f = self.tp(rawio)
1249 f.write(b"123xxx")
1250 f.x = f
1251 wr = weakref.ref(f)
1252 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001253 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001254 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001255 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001256 self.assertEqual(f.read(), b"123xxx")
1257
1258
1259class PyBufferedWriterTest(BufferedWriterTest):
1260 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001261
Guido van Rossum01a27522007-03-07 01:00:12 +00001262class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001263
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001264 def test_constructor(self):
1265 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001266 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001267
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001268 def test_detach(self):
1269 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1270 self.assertRaises(self.UnsupportedOperation, pair.detach)
1271
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001272 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001273 with support.check_warnings(("max_buffer_size is deprecated",
1274 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001275 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001276
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001277 def test_constructor_with_not_readable(self):
1278 class NotReadable(MockRawIO):
1279 def readable(self):
1280 return False
1281
1282 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1283
1284 def test_constructor_with_not_writeable(self):
1285 class NotWriteable(MockRawIO):
1286 def writable(self):
1287 return False
1288
1289 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1290
1291 def test_read(self):
1292 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1293
1294 self.assertEqual(pair.read(3), b"abc")
1295 self.assertEqual(pair.read(1), b"d")
1296 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001297 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1298 self.assertEqual(pair.read(None), b"abc")
1299
1300 def test_readlines(self):
1301 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1302 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1303 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1304 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001305
1306 def test_read1(self):
1307 # .read1() is delegated to the underlying reader object, so this test
1308 # can be shallow.
1309 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1310
1311 self.assertEqual(pair.read1(3), b"abc")
1312
1313 def test_readinto(self):
1314 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1315
1316 data = bytearray(5)
1317 self.assertEqual(pair.readinto(data), 5)
1318 self.assertEqual(data, b"abcde")
1319
1320 def test_write(self):
1321 w = self.MockRawIO()
1322 pair = self.tp(self.MockRawIO(), w)
1323
1324 pair.write(b"abc")
1325 pair.flush()
1326 pair.write(b"def")
1327 pair.flush()
1328 self.assertEqual(w._write_stack, [b"abc", b"def"])
1329
1330 def test_peek(self):
1331 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1332
1333 self.assertTrue(pair.peek(3).startswith(b"abc"))
1334 self.assertEqual(pair.read(3), b"abc")
1335
1336 def test_readable(self):
1337 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1338 self.assertTrue(pair.readable())
1339
1340 def test_writeable(self):
1341 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1342 self.assertTrue(pair.writable())
1343
1344 def test_seekable(self):
1345 # BufferedRWPairs are never seekable, even if their readers and writers
1346 # are.
1347 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1348 self.assertFalse(pair.seekable())
1349
1350 # .flush() is delegated to the underlying writer object and has been
1351 # tested in the test_write method.
1352
1353 def test_close_and_closed(self):
1354 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1355 self.assertFalse(pair.closed)
1356 pair.close()
1357 self.assertTrue(pair.closed)
1358
1359 def test_isatty(self):
1360 class SelectableIsAtty(MockRawIO):
1361 def __init__(self, isatty):
1362 MockRawIO.__init__(self)
1363 self._isatty = isatty
1364
1365 def isatty(self):
1366 return self._isatty
1367
1368 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1369 self.assertFalse(pair.isatty())
1370
1371 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1372 self.assertTrue(pair.isatty())
1373
1374 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1375 self.assertTrue(pair.isatty())
1376
1377 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1378 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380class CBufferedRWPairTest(BufferedRWPairTest):
1381 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001382
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383class PyBufferedRWPairTest(BufferedRWPairTest):
1384 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001385
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001386
1387class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1388 read_mode = "rb+"
1389 write_mode = "wb+"
1390
1391 def test_constructor(self):
1392 BufferedReaderTest.test_constructor(self)
1393 BufferedWriterTest.test_constructor(self)
1394
1395 def test_read_and_write(self):
1396 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001397 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001398
1399 self.assertEqual(b"as", rw.read(2))
1400 rw.write(b"ddd")
1401 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001402 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001404 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406 def test_seek_and_tell(self):
1407 raw = self.BytesIO(b"asdfghjkl")
1408 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001409
Ezio Melottib3aedd42010-11-20 19:04:17 +00001410 self.assertEqual(b"as", rw.read(2))
1411 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001412 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001413 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001414
Antoine Pitroue05565e2011-08-20 14:39:23 +02001415 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001416 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001417 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001419 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001420 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001421 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001422 self.assertEqual(7, rw.tell())
1423 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001424 rw.flush()
1425 self.assertEqual(b"asdf123fl", raw.getvalue())
1426
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001427 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 def check_flush_and_read(self, read_func):
1430 raw = self.BytesIO(b"abcdefghi")
1431 bufio = self.tp(raw)
1432
Ezio Melottib3aedd42010-11-20 19:04:17 +00001433 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(b"ef", read_func(bufio, 2))
1436 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(6, bufio.tell())
1439 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 raw.seek(0, 0)
1441 raw.write(b"XYZ")
1442 # flush() resets the read buffer
1443 bufio.flush()
1444 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446
1447 def test_flush_and_read(self):
1448 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1449
1450 def test_flush_and_readinto(self):
1451 def _readinto(bufio, n=-1):
1452 b = bytearray(n if n >= 0 else 9999)
1453 n = bufio.readinto(b)
1454 return bytes(b[:n])
1455 self.check_flush_and_read(_readinto)
1456
1457 def test_flush_and_peek(self):
1458 def _peek(bufio, n=-1):
1459 # This relies on the fact that the buffer can contain the whole
1460 # raw stream, otherwise peek() can return less.
1461 b = bufio.peek(n)
1462 if n != -1:
1463 b = b[:n]
1464 bufio.seek(len(b), 1)
1465 return b
1466 self.check_flush_and_read(_peek)
1467
1468 def test_flush_and_write(self):
1469 raw = self.BytesIO(b"abcdefghi")
1470 bufio = self.tp(raw)
1471
1472 bufio.write(b"123")
1473 bufio.flush()
1474 bufio.write(b"45")
1475 bufio.flush()
1476 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(b"12345fghi", raw.getvalue())
1478 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479
1480 def test_threads(self):
1481 BufferedReaderTest.test_threads(self)
1482 BufferedWriterTest.test_threads(self)
1483
1484 def test_writes_and_peek(self):
1485 def _peek(bufio):
1486 bufio.peek(1)
1487 self.check_writes(_peek)
1488 def _peek(bufio):
1489 pos = bufio.tell()
1490 bufio.seek(-1, 1)
1491 bufio.peek(1)
1492 bufio.seek(pos, 0)
1493 self.check_writes(_peek)
1494
1495 def test_writes_and_reads(self):
1496 def _read(bufio):
1497 bufio.seek(-1, 1)
1498 bufio.read(1)
1499 self.check_writes(_read)
1500
1501 def test_writes_and_read1s(self):
1502 def _read1(bufio):
1503 bufio.seek(-1, 1)
1504 bufio.read1(1)
1505 self.check_writes(_read1)
1506
1507 def test_writes_and_readintos(self):
1508 def _read(bufio):
1509 bufio.seek(-1, 1)
1510 bufio.readinto(bytearray(1))
1511 self.check_writes(_read)
1512
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001513 def test_write_after_readahead(self):
1514 # Issue #6629: writing after the buffer was filled by readahead should
1515 # first rewind the raw stream.
1516 for overwrite_size in [1, 5]:
1517 raw = self.BytesIO(b"A" * 10)
1518 bufio = self.tp(raw, 4)
1519 # Trigger readahead
1520 self.assertEqual(bufio.read(1), b"A")
1521 self.assertEqual(bufio.tell(), 1)
1522 # Overwriting should rewind the raw stream if it needs so
1523 bufio.write(b"B" * overwrite_size)
1524 self.assertEqual(bufio.tell(), overwrite_size + 1)
1525 # If the write size was smaller than the buffer size, flush() and
1526 # check that rewind happens.
1527 bufio.flush()
1528 self.assertEqual(bufio.tell(), overwrite_size + 1)
1529 s = raw.getvalue()
1530 self.assertEqual(s,
1531 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1532
Antoine Pitrou7c404892011-05-13 00:13:33 +02001533 def test_write_rewind_write(self):
1534 # Various combinations of reading / writing / seeking backwards / writing again
1535 def mutate(bufio, pos1, pos2):
1536 assert pos2 >= pos1
1537 # Fill the buffer
1538 bufio.seek(pos1)
1539 bufio.read(pos2 - pos1)
1540 bufio.write(b'\x02')
1541 # This writes earlier than the previous write, but still inside
1542 # the buffer.
1543 bufio.seek(pos1)
1544 bufio.write(b'\x01')
1545
1546 b = b"\x80\x81\x82\x83\x84"
1547 for i in range(0, len(b)):
1548 for j in range(i, len(b)):
1549 raw = self.BytesIO(b)
1550 bufio = self.tp(raw, 100)
1551 mutate(bufio, i, j)
1552 bufio.flush()
1553 expected = bytearray(b)
1554 expected[j] = 2
1555 expected[i] = 1
1556 self.assertEqual(raw.getvalue(), expected,
1557 "failed result for i=%d, j=%d" % (i, j))
1558
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001559 def test_truncate_after_read_or_write(self):
1560 raw = self.BytesIO(b"A" * 10)
1561 bufio = self.tp(raw, 100)
1562 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1563 self.assertEqual(bufio.truncate(), 2)
1564 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1565 self.assertEqual(bufio.truncate(), 4)
1566
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001567 def test_misbehaved_io(self):
1568 BufferedReaderTest.test_misbehaved_io(self)
1569 BufferedWriterTest.test_misbehaved_io(self)
1570
Antoine Pitroue05565e2011-08-20 14:39:23 +02001571 def test_interleaved_read_write(self):
1572 # Test for issue #12213
1573 with self.BytesIO(b'abcdefgh') as raw:
1574 with self.tp(raw, 100) as f:
1575 f.write(b"1")
1576 self.assertEqual(f.read(1), b'b')
1577 f.write(b'2')
1578 self.assertEqual(f.read1(1), b'd')
1579 f.write(b'3')
1580 buf = bytearray(1)
1581 f.readinto(buf)
1582 self.assertEqual(buf, b'f')
1583 f.write(b'4')
1584 self.assertEqual(f.peek(1), b'h')
1585 f.flush()
1586 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1587
1588 with self.BytesIO(b'abc') as raw:
1589 with self.tp(raw, 100) as f:
1590 self.assertEqual(f.read(1), b'a')
1591 f.write(b"2")
1592 self.assertEqual(f.read(1), b'c')
1593 f.flush()
1594 self.assertEqual(raw.getvalue(), b'a2c')
1595
1596 def test_interleaved_readline_write(self):
1597 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1598 with self.tp(raw) as f:
1599 f.write(b'1')
1600 self.assertEqual(f.readline(), b'b\n')
1601 f.write(b'2')
1602 self.assertEqual(f.readline(), b'def\n')
1603 f.write(b'3')
1604 self.assertEqual(f.readline(), b'\n')
1605 f.flush()
1606 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1607
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001608 # You can't construct a BufferedRandom over a non-seekable stream.
1609 test_unseekable = None
1610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611class CBufferedRandomTest(BufferedRandomTest):
1612 tp = io.BufferedRandom
1613
1614 def test_constructor(self):
1615 BufferedRandomTest.test_constructor(self)
1616 # The allocation can succeed on 32-bit builds, e.g. with more
1617 # than 2GB RAM and a 64-bit kernel.
1618 if sys.maxsize > 0x7FFFFFFF:
1619 rawio = self.MockRawIO()
1620 bufio = self.tp(rawio)
1621 self.assertRaises((OverflowError, MemoryError, ValueError),
1622 bufio.__init__, rawio, sys.maxsize)
1623
1624 def test_garbage_collection(self):
1625 CBufferedReaderTest.test_garbage_collection(self)
1626 CBufferedWriterTest.test_garbage_collection(self)
1627
1628class PyBufferedRandomTest(BufferedRandomTest):
1629 tp = pyio.BufferedRandom
1630
1631
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001632# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1633# properties:
1634# - A single output character can correspond to many bytes of input.
1635# - The number of input bytes to complete the character can be
1636# undetermined until the last input byte is received.
1637# - The number of input bytes can vary depending on previous input.
1638# - A single input byte can correspond to many characters of output.
1639# - The number of output characters can be undetermined until the
1640# last input byte is received.
1641# - The number of output characters can vary depending on previous input.
1642
1643class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1644 """
1645 For testing seek/tell behavior with a stateful, buffering decoder.
1646
1647 Input is a sequence of words. Words may be fixed-length (length set
1648 by input) or variable-length (period-terminated). In variable-length
1649 mode, extra periods are ignored. Possible words are:
1650 - 'i' followed by a number sets the input length, I (maximum 99).
1651 When I is set to 0, words are space-terminated.
1652 - 'o' followed by a number sets the output length, O (maximum 99).
1653 - Any other word is converted into a word followed by a period on
1654 the output. The output word consists of the input word truncated
1655 or padded out with hyphens to make its length equal to O. If O
1656 is 0, the word is output verbatim without truncating or padding.
1657 I and O are initially set to 1. When I changes, any buffered input is
1658 re-scanned according to the new I. EOF also terminates the last word.
1659 """
1660
1661 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001662 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001663 self.reset()
1664
1665 def __repr__(self):
1666 return '<SID %x>' % id(self)
1667
1668 def reset(self):
1669 self.i = 1
1670 self.o = 1
1671 self.buffer = bytearray()
1672
1673 def getstate(self):
1674 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1675 return bytes(self.buffer), i*100 + o
1676
1677 def setstate(self, state):
1678 buffer, io = state
1679 self.buffer = bytearray(buffer)
1680 i, o = divmod(io, 100)
1681 self.i, self.o = i ^ 1, o ^ 1
1682
1683 def decode(self, input, final=False):
1684 output = ''
1685 for b in input:
1686 if self.i == 0: # variable-length, terminated with period
1687 if b == ord('.'):
1688 if self.buffer:
1689 output += self.process_word()
1690 else:
1691 self.buffer.append(b)
1692 else: # fixed-length, terminate after self.i bytes
1693 self.buffer.append(b)
1694 if len(self.buffer) == self.i:
1695 output += self.process_word()
1696 if final and self.buffer: # EOF terminates the last word
1697 output += self.process_word()
1698 return output
1699
1700 def process_word(self):
1701 output = ''
1702 if self.buffer[0] == ord('i'):
1703 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1704 elif self.buffer[0] == ord('o'):
1705 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1706 else:
1707 output = self.buffer.decode('ascii')
1708 if len(output) < self.o:
1709 output += '-'*self.o # pad out with hyphens
1710 if self.o:
1711 output = output[:self.o] # truncate to output length
1712 output += '.'
1713 self.buffer = bytearray()
1714 return output
1715
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001716 codecEnabled = False
1717
1718 @classmethod
1719 def lookupTestDecoder(cls, name):
1720 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001721 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001722 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001723 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001724 incrementalencoder=None,
1725 streamreader=None, streamwriter=None,
1726 incrementaldecoder=cls)
1727
1728# Register the previous decoder for testing.
1729# Disabled by default, tests will enable it.
1730codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1731
1732
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001733class StatefulIncrementalDecoderTest(unittest.TestCase):
1734 """
1735 Make sure the StatefulIncrementalDecoder actually works.
1736 """
1737
1738 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001739 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001740 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001741 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001742 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001743 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001744 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001745 # I=0, O=6 (variable-length input, fixed-length output)
1746 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1747 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001748 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001749 # I=6, O=3 (fixed-length input > fixed-length output)
1750 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1751 # I=0, then 3; O=29, then 15 (with longer output)
1752 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1753 'a----------------------------.' +
1754 'b----------------------------.' +
1755 'cde--------------------------.' +
1756 'abcdefghijabcde.' +
1757 'a.b------------.' +
1758 '.c.------------.' +
1759 'd.e------------.' +
1760 'k--------------.' +
1761 'l--------------.' +
1762 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001763 ]
1764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001766 # Try a few one-shot test cases.
1767 for input, eof, output in self.test_cases:
1768 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001770
1771 # Also test an unfinished decode, followed by forcing EOF.
1772 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001773 self.assertEqual(d.decode(b'oiabcd'), '')
1774 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001775
1776class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001777
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001778 def setUp(self):
1779 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1780 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001781 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001782
Guido van Rossumd0712812007-04-11 16:32:43 +00001783 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001784 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786 def test_constructor(self):
1787 r = self.BytesIO(b"\xc3\xa9\n\n")
1788 b = self.BufferedReader(r, 1000)
1789 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001790 t.__init__(b, encoding="latin-1", newline="\r\n")
1791 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001792 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001793 t.__init__(b, encoding="utf-8", line_buffering=True)
1794 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001795 self.assertEqual(t.line_buffering, True)
1796 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 self.assertRaises(TypeError, t.__init__, b, newline=42)
1798 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1799
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001800 def test_detach(self):
1801 r = self.BytesIO()
1802 b = self.BufferedWriter(r)
1803 t = self.TextIOWrapper(b)
1804 self.assertIs(t.detach(), b)
1805
1806 t = self.TextIOWrapper(b, encoding="ascii")
1807 t.write("howdy")
1808 self.assertFalse(r.getvalue())
1809 t.detach()
1810 self.assertEqual(r.getvalue(), b"howdy")
1811 self.assertRaises(ValueError, t.detach)
1812
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001813 def test_repr(self):
1814 raw = self.BytesIO("hello".encode("utf-8"))
1815 b = self.BufferedReader(raw)
1816 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001817 modname = self.TextIOWrapper.__module__
1818 self.assertEqual(repr(t),
1819 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1820 raw.name = "dummy"
1821 self.assertEqual(repr(t),
1822 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001823 t.mode = "r"
1824 self.assertEqual(repr(t),
1825 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001826 raw.name = b"dummy"
1827 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001828 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_line_buffering(self):
1831 r = self.BytesIO()
1832 b = self.BufferedWriter(r, 1000)
1833 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001834 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001836 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001837 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001838 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001839 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841 def test_encoding(self):
1842 # Check the encoding attribute is always set, and valid
1843 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001844 t = self.TextIOWrapper(b, encoding="utf-8")
1845 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001847 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848 codecs.lookup(t.encoding)
1849
1850 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001851 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 b = self.BytesIO(b"abc\n\xff\n")
1853 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001854 self.assertRaises(UnicodeError, t.read)
1855 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 b = self.BytesIO(b"abc\n\xff\n")
1857 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001858 self.assertRaises(UnicodeError, t.read)
1859 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 b = self.BytesIO(b"abc\n\xff\n")
1861 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001862 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001863 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 b = self.BytesIO(b"abc\n\xff\n")
1865 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001866 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001869 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 b = self.BytesIO()
1871 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001872 self.assertRaises(UnicodeError, t.write, "\xff")
1873 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 b = self.BytesIO()
1875 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001876 self.assertRaises(UnicodeError, t.write, "\xff")
1877 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 b = self.BytesIO()
1879 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001880 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001881 t.write("abc\xffdef\n")
1882 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001883 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001884 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 b = self.BytesIO()
1886 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001887 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001888 t.write("abc\xffdef\n")
1889 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001890 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001892 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001893 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1894
1895 tests = [
1896 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001897 [ '', input_lines ],
1898 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1899 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1900 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001901 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001902 encodings = (
1903 'utf-8', 'latin-1',
1904 'utf-16', 'utf-16-le', 'utf-16-be',
1905 'utf-32', 'utf-32-le', 'utf-32-be',
1906 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001907
Guido van Rossum8358db22007-08-18 21:39:55 +00001908 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001909 # character in TextIOWrapper._pending_line.
1910 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001911 # XXX: str.encode() should return bytes
1912 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001913 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001914 for bufsize in range(1, 10):
1915 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1917 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001918 encoding=encoding)
1919 if do_reads:
1920 got_lines = []
1921 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001922 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001923 if c2 == '':
1924 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001925 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001926 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001927 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001928 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001929
1930 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001931 self.assertEqual(got_line, exp_line)
1932 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_newlines_input(self):
1935 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001936 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1937 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001938 (None, normalized.decode("ascii").splitlines(True)),
1939 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1941 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1942 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001943 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 buf = self.BytesIO(testdata)
1945 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001946 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001947 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950 def test_newlines_output(self):
1951 testdict = {
1952 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1953 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1954 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1955 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1956 }
1957 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1958 for newline, expected in tests:
1959 buf = self.BytesIO()
1960 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1961 txt.write("AAA\nB")
1962 txt.write("BB\nCCC\n")
1963 txt.write("X\rY\r\nZ")
1964 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001965 self.assertEqual(buf.closed, False)
1966 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967
1968 def test_destructor(self):
1969 l = []
1970 base = self.BytesIO
1971 class MyBytesIO(base):
1972 def close(self):
1973 l.append(self.getvalue())
1974 base.close(self)
1975 b = MyBytesIO()
1976 t = self.TextIOWrapper(b, encoding="ascii")
1977 t.write("abc")
1978 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001979 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981
1982 def test_override_destructor(self):
1983 record = []
1984 class MyTextIO(self.TextIOWrapper):
1985 def __del__(self):
1986 record.append(1)
1987 try:
1988 f = super().__del__
1989 except AttributeError:
1990 pass
1991 else:
1992 f()
1993 def close(self):
1994 record.append(2)
1995 super().close()
1996 def flush(self):
1997 record.append(3)
1998 super().flush()
1999 b = self.BytesIO()
2000 t = MyTextIO(b, encoding="ascii")
2001 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002002 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 self.assertEqual(record, [1, 2, 3])
2004
2005 def test_error_through_destructor(self):
2006 # Test that the exception state is not modified by a destructor,
2007 # even if close() fails.
2008 rawio = self.CloseFailureIO()
2009 def f():
2010 self.TextIOWrapper(rawio).xyzzy
2011 with support.captured_output("stderr") as s:
2012 self.assertRaises(AttributeError, f)
2013 s = s.getvalue().strip()
2014 if s:
2015 # The destructor *may* have printed an unraisable error, check it
2016 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002017 self.assertTrue(s.startswith("Exception IOError: "), s)
2018 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002019
Guido van Rossum9b76da62007-04-11 01:09:03 +00002020 # Systematic tests of the text I/O API
2021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002023 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002024 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002026 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002027 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002028 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002030 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(f.tell(), 0)
2032 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002033 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002034 self.assertEqual(f.seek(0), 0)
2035 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002036 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002037 self.assertEqual(f.read(2), "ab")
2038 self.assertEqual(f.read(1), "c")
2039 self.assertEqual(f.read(1), "")
2040 self.assertEqual(f.read(), "")
2041 self.assertEqual(f.tell(), cookie)
2042 self.assertEqual(f.seek(0), 0)
2043 self.assertEqual(f.seek(0, 2), cookie)
2044 self.assertEqual(f.write("def"), 3)
2045 self.assertEqual(f.seek(cookie), cookie)
2046 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002047 if enc.startswith("utf"):
2048 self.multi_line_test(f, enc)
2049 f.close()
2050
2051 def multi_line_test(self, f, enc):
2052 f.seek(0)
2053 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002054 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002056 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002057 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002058 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002059 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002060 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002061 wlines.append((f.tell(), line))
2062 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 f.seek(0)
2064 rlines = []
2065 while True:
2066 pos = f.tell()
2067 line = f.readline()
2068 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002069 break
2070 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002071 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002074 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002075 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002076 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002077 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002078 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002079 p2 = f.tell()
2080 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(f.tell(), p0)
2082 self.assertEqual(f.readline(), "\xff\n")
2083 self.assertEqual(f.tell(), p1)
2084 self.assertEqual(f.readline(), "\xff\n")
2085 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002086 f.seek(0)
2087 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002088 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002089 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002091 f.close()
2092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093 def test_seeking(self):
2094 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002095 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002096 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002097 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002099 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002100 suffix = bytes(u_suffix.encode("utf-8"))
2101 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002102 with self.open(support.TESTFN, "wb") as f:
2103 f.write(line*2)
2104 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2105 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(s, str(prefix, "ascii"))
2107 self.assertEqual(f.tell(), prefix_size)
2108 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002111 # Regression test for a specific bug
2112 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002113 with self.open(support.TESTFN, "wb") as f:
2114 f.write(data)
2115 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2116 f._CHUNK_SIZE # Just test that it exists
2117 f._CHUNK_SIZE = 2
2118 f.readline()
2119 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 def test_seek_and_tell(self):
2122 #Test seek/tell using the StatefulIncrementalDecoder.
2123 # Make test faster by doing smaller seeks
2124 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002125
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002126 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002127 """Tell/seek to various points within a data stream and ensure
2128 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002130 f.write(data)
2131 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002132 f = self.open(support.TESTFN, encoding='test_decoder')
2133 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002134 decoded = f.read()
2135 f.close()
2136
Neal Norwitze2b07052008-03-18 19:52:05 +00002137 for i in range(min_pos, len(decoded) + 1): # seek positions
2138 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002140 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002141 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002142 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002143 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002144 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002145 f.close()
2146
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002147 # Enable the test decoder.
2148 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002149
2150 # Run the tests.
2151 try:
2152 # Try each test case.
2153 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002154 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002155
2156 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002157 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2158 offset = CHUNK_SIZE - len(input)//2
2159 prefix = b'.'*offset
2160 # Don't bother seeking into the prefix (takes too long).
2161 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002162 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002163
2164 # Ensure our test decoder won't interfere with subsequent tests.
2165 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002166 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002168 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002169 data = "1234567890"
2170 tests = ("utf-16",
2171 "utf-16-le",
2172 "utf-16-be",
2173 "utf-32",
2174 "utf-32-le",
2175 "utf-32-be")
2176 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 buf = self.BytesIO()
2178 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002179 # Check if the BOM is written only once (see issue1753).
2180 f.write(data)
2181 f.write(data)
2182 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002183 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002184 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(f.read(), data * 2)
2186 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002187
Benjamin Petersona1b49012009-03-31 23:11:32 +00002188 def test_unreadable(self):
2189 class UnReadable(self.BytesIO):
2190 def readable(self):
2191 return False
2192 txt = self.TextIOWrapper(UnReadable())
2193 self.assertRaises(IOError, txt.read)
2194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 def test_read_one_by_one(self):
2196 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002197 reads = ""
2198 while True:
2199 c = txt.read(1)
2200 if not c:
2201 break
2202 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002204
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002205 def test_readlines(self):
2206 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2207 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2208 txt.seek(0)
2209 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2210 txt.seek(0)
2211 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2212
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002213 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002214 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002215 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002216 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002217 reads = ""
2218 while True:
2219 c = txt.read(128)
2220 if not c:
2221 break
2222 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002224
2225 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002227
2228 # read one char at a time
2229 reads = ""
2230 while True:
2231 c = txt.read(1)
2232 if not c:
2233 break
2234 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002235 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002236
2237 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002239 txt._CHUNK_SIZE = 4
2240
2241 reads = ""
2242 while True:
2243 c = txt.read(4)
2244 if not c:
2245 break
2246 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002248
2249 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002250 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002251 txt._CHUNK_SIZE = 4
2252
2253 reads = txt.read(4)
2254 reads += txt.read(4)
2255 reads += txt.readline()
2256 reads += txt.readline()
2257 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002258 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002259
2260 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002262 txt._CHUNK_SIZE = 4
2263
2264 reads = txt.read(4)
2265 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267
2268 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002270 txt._CHUNK_SIZE = 4
2271
2272 reads = txt.read(4)
2273 pos = txt.tell()
2274 txt.seek(0)
2275 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002276 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002277
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002278 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 buffer = self.BytesIO(self.testdata)
2280 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002281
2282 self.assertEqual(buffer.seekable(), txt.seekable())
2283
Antoine Pitroue4501852009-05-14 18:55:55 +00002284 def test_append_bom(self):
2285 # The BOM is not written again when appending to a non-empty file
2286 filename = support.TESTFN
2287 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2288 with self.open(filename, 'w', encoding=charset) as f:
2289 f.write('aaa')
2290 pos = f.tell()
2291 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002292 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002293
2294 with self.open(filename, 'a', encoding=charset) as f:
2295 f.write('xxx')
2296 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002297 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002298
2299 def test_seek_bom(self):
2300 # Same test, but when seeking manually
2301 filename = support.TESTFN
2302 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2303 with self.open(filename, 'w', encoding=charset) as f:
2304 f.write('aaa')
2305 pos = f.tell()
2306 with self.open(filename, 'r+', encoding=charset) as f:
2307 f.seek(pos)
2308 f.write('zzz')
2309 f.seek(0)
2310 f.write('bbb')
2311 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002313
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002314 def test_errors_property(self):
2315 with self.open(support.TESTFN, "w") as f:
2316 self.assertEqual(f.errors, "strict")
2317 with self.open(support.TESTFN, "w", errors="replace") as f:
2318 self.assertEqual(f.errors, "replace")
2319
Brett Cannon31f59292011-02-21 19:29:56 +00002320 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002321 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002322 def test_threads_write(self):
2323 # Issue6750: concurrent writes could duplicate data
2324 event = threading.Event()
2325 with self.open(support.TESTFN, "w", buffering=1) as f:
2326 def run(n):
2327 text = "Thread%03d\n" % n
2328 event.wait()
2329 f.write(text)
2330 threads = [threading.Thread(target=lambda n=x: run(n))
2331 for x in range(20)]
2332 for t in threads:
2333 t.start()
2334 time.sleep(0.02)
2335 event.set()
2336 for t in threads:
2337 t.join()
2338 with self.open(support.TESTFN) as f:
2339 content = f.read()
2340 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002341 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002342
Antoine Pitrou6be88762010-05-03 16:48:20 +00002343 def test_flush_error_on_close(self):
2344 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2345 def bad_flush():
2346 raise IOError()
2347 txt.flush = bad_flush
2348 self.assertRaises(IOError, txt.close) # exception not swallowed
2349
2350 def test_multi_close(self):
2351 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2352 txt.close()
2353 txt.close()
2354 txt.close()
2355 self.assertRaises(ValueError, txt.flush)
2356
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002357 def test_unseekable(self):
2358 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2359 self.assertRaises(self.UnsupportedOperation, txt.tell)
2360 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2361
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002362 def test_readonly_attributes(self):
2363 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2364 buf = self.BytesIO(self.testdata)
2365 with self.assertRaises(AttributeError):
2366 txt.buffer = buf
2367
Antoine Pitroue96ec682011-07-23 21:46:35 +02002368 def test_rawio(self):
2369 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2370 # that subprocess.Popen() can have the required unbuffered
2371 # semantics with universal_newlines=True.
2372 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2373 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2374 # Reads
2375 self.assertEqual(txt.read(4), 'abcd')
2376 self.assertEqual(txt.readline(), 'efghi\n')
2377 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2378
2379 def test_rawio_write_through(self):
2380 # Issue #12591: with write_through=True, writes don't need a flush
2381 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2382 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2383 write_through=True)
2384 txt.write('1')
2385 txt.write('23\n4')
2386 txt.write('5')
2387 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389class CTextIOWrapperTest(TextIOWrapperTest):
2390
2391 def test_initialization(self):
2392 r = self.BytesIO(b"\xc3\xa9\n\n")
2393 b = self.BufferedReader(r, 1000)
2394 t = self.TextIOWrapper(b)
2395 self.assertRaises(TypeError, t.__init__, b, newline=42)
2396 self.assertRaises(ValueError, t.read)
2397 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2398 self.assertRaises(ValueError, t.read)
2399
2400 def test_garbage_collection(self):
2401 # C TextIOWrapper objects are collected, and collecting them flushes
2402 # all data to disk.
2403 # The Python version has __del__, so it ends in gc.garbage instead.
2404 rawio = io.FileIO(support.TESTFN, "wb")
2405 b = self.BufferedWriter(rawio)
2406 t = self.TextIOWrapper(b, encoding="ascii")
2407 t.write("456def")
2408 t.x = t
2409 wr = weakref.ref(t)
2410 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002411 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002412 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002413 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002414 self.assertEqual(f.read(), b"456def")
2415
2416class PyTextIOWrapperTest(TextIOWrapperTest):
2417 pass
2418
2419
2420class IncrementalNewlineDecoderTest(unittest.TestCase):
2421
2422 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002423 # UTF-8 specific tests for a newline decoder
2424 def _check_decode(b, s, **kwargs):
2425 # We exercise getstate() / setstate() as well as decode()
2426 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002428 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002430
Antoine Pitrou180a3362008-12-14 16:36:46 +00002431 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002432
Antoine Pitrou180a3362008-12-14 16:36:46 +00002433 _check_decode(b'\xe8', "")
2434 _check_decode(b'\xa2', "")
2435 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002436
Antoine Pitrou180a3362008-12-14 16:36:46 +00002437 _check_decode(b'\xe8', "")
2438 _check_decode(b'\xa2', "")
2439 _check_decode(b'\x88', "\u8888")
2440
2441 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002442 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2443
Antoine Pitrou180a3362008-12-14 16:36:46 +00002444 decoder.reset()
2445 _check_decode(b'\n', "\n")
2446 _check_decode(b'\r', "")
2447 _check_decode(b'', "\n", final=True)
2448 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002449
Antoine Pitrou180a3362008-12-14 16:36:46 +00002450 _check_decode(b'\r', "")
2451 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002452
Antoine Pitrou180a3362008-12-14 16:36:46 +00002453 _check_decode(b'\r\r\n', "\n\n")
2454 _check_decode(b'\r', "")
2455 _check_decode(b'\r', "\n")
2456 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457
Antoine Pitrou180a3362008-12-14 16:36:46 +00002458 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2459 _check_decode(b'\xe8\xa2\x88', "\u8888")
2460 _check_decode(b'\n', "\n")
2461 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2462 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002463
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002464 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002465 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 if encoding is not None:
2467 encoder = codecs.getincrementalencoder(encoding)()
2468 def _decode_bytewise(s):
2469 # Decode one byte at a time
2470 for b in encoder.encode(s):
2471 result.append(decoder.decode(bytes([b])))
2472 else:
2473 encoder = None
2474 def _decode_bytewise(s):
2475 # Decode one char at a time
2476 for c in s:
2477 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002479 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002480 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002481 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002483 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002485 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002486 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002487 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002488 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002490 input = "abc"
2491 if encoder is not None:
2492 encoder.reset()
2493 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002494 self.assertEqual(decoder.decode(input), "abc")
2495 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002496
2497 def test_newline_decoder(self):
2498 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002499 # None meaning the IncrementalNewlineDecoder takes unicode input
2500 # rather than bytes input
2501 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002502 'utf-16', 'utf-16-le', 'utf-16-be',
2503 'utf-32', 'utf-32-le', 'utf-32-be',
2504 )
2505 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 decoder = enc and codecs.getincrementaldecoder(enc)()
2507 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2508 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002509 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2511 self.check_newline_decoding_utf8(decoder)
2512
Antoine Pitrou66913e22009-03-06 23:40:56 +00002513 def test_newline_bytes(self):
2514 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2515 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002516 self.assertEqual(dec.newlines, None)
2517 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2518 self.assertEqual(dec.newlines, None)
2519 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2520 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002521 dec = self.IncrementalNewlineDecoder(None, translate=False)
2522 _check(dec)
2523 dec = self.IncrementalNewlineDecoder(None, translate=True)
2524 _check(dec)
2525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2527 pass
2528
2529class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2530 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002531
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002532
Guido van Rossum01a27522007-03-07 01:00:12 +00002533# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002534
Guido van Rossum5abbf752007-08-27 17:39:33 +00002535class MiscIOTest(unittest.TestCase):
2536
Barry Warsaw40e82462008-11-20 20:14:50 +00002537 def tearDown(self):
2538 support.unlink(support.TESTFN)
2539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002540 def test___all__(self):
2541 for name in self.io.__all__:
2542 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002543 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002544 if name == "open":
2545 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002546 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002547 self.assertTrue(issubclass(obj, Exception), name)
2548 elif not name.startswith("SEEK_"):
2549 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002550
Barry Warsaw40e82462008-11-20 20:14:50 +00002551 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002554 f.close()
2555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(f.name, support.TESTFN)
2558 self.assertEqual(f.buffer.name, support.TESTFN)
2559 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2560 self.assertEqual(f.mode, "U")
2561 self.assertEqual(f.buffer.mode, "rb")
2562 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002563 f.close()
2564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002566 self.assertEqual(f.mode, "w+")
2567 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2568 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(g.mode, "wb")
2572 self.assertEqual(g.raw.mode, "wb")
2573 self.assertEqual(g.name, f.fileno())
2574 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002575 f.close()
2576 g.close()
2577
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002578 def test_io_after_close(self):
2579 for kwargs in [
2580 {"mode": "w"},
2581 {"mode": "wb"},
2582 {"mode": "w", "buffering": 1},
2583 {"mode": "w", "buffering": 2},
2584 {"mode": "wb", "buffering": 0},
2585 {"mode": "r"},
2586 {"mode": "rb"},
2587 {"mode": "r", "buffering": 1},
2588 {"mode": "r", "buffering": 2},
2589 {"mode": "rb", "buffering": 0},
2590 {"mode": "w+"},
2591 {"mode": "w+b"},
2592 {"mode": "w+", "buffering": 1},
2593 {"mode": "w+", "buffering": 2},
2594 {"mode": "w+b", "buffering": 0},
2595 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002597 f.close()
2598 self.assertRaises(ValueError, f.flush)
2599 self.assertRaises(ValueError, f.fileno)
2600 self.assertRaises(ValueError, f.isatty)
2601 self.assertRaises(ValueError, f.__iter__)
2602 if hasattr(f, "peek"):
2603 self.assertRaises(ValueError, f.peek, 1)
2604 self.assertRaises(ValueError, f.read)
2605 if hasattr(f, "read1"):
2606 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002607 if hasattr(f, "readall"):
2608 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002609 if hasattr(f, "readinto"):
2610 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2611 self.assertRaises(ValueError, f.readline)
2612 self.assertRaises(ValueError, f.readlines)
2613 self.assertRaises(ValueError, f.seek, 0)
2614 self.assertRaises(ValueError, f.tell)
2615 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 self.assertRaises(ValueError, f.write,
2617 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002618 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002621 def test_blockingioerror(self):
2622 # Various BlockingIOError issues
2623 self.assertRaises(TypeError, self.BlockingIOError)
2624 self.assertRaises(TypeError, self.BlockingIOError, 1)
2625 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2626 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2627 b = self.BlockingIOError(1, "")
2628 self.assertEqual(b.characters_written, 0)
2629 class C(str):
2630 pass
2631 c = C("")
2632 b = self.BlockingIOError(1, c)
2633 c.b = b
2634 b.c = c
2635 wr = weakref.ref(c)
2636 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002637 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002638 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639
2640 def test_abcs(self):
2641 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002642 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2643 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2644 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2645 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002646
2647 def _check_abc_inheritance(self, abcmodule):
2648 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002649 self.assertIsInstance(f, abcmodule.IOBase)
2650 self.assertIsInstance(f, abcmodule.RawIOBase)
2651 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2652 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002653 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002654 self.assertIsInstance(f, abcmodule.IOBase)
2655 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2656 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2657 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002659 self.assertIsInstance(f, abcmodule.IOBase)
2660 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2661 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2662 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663
2664 def test_abc_inheritance(self):
2665 # Test implementations inherit from their respective ABCs
2666 self._check_abc_inheritance(self)
2667
2668 def test_abc_inheritance_official(self):
2669 # Test implementations inherit from the official ABCs of the
2670 # baseline "io" module.
2671 self._check_abc_inheritance(io)
2672
Antoine Pitroue033e062010-10-29 10:38:18 +00002673 def _check_warn_on_dealloc(self, *args, **kwargs):
2674 f = open(*args, **kwargs)
2675 r = repr(f)
2676 with self.assertWarns(ResourceWarning) as cm:
2677 f = None
2678 support.gc_collect()
2679 self.assertIn(r, str(cm.warning.args[0]))
2680
2681 def test_warn_on_dealloc(self):
2682 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2683 self._check_warn_on_dealloc(support.TESTFN, "wb")
2684 self._check_warn_on_dealloc(support.TESTFN, "w")
2685
2686 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2687 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002688 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002689 for fd in fds:
2690 try:
2691 os.close(fd)
2692 except EnvironmentError as e:
2693 if e.errno != errno.EBADF:
2694 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002695 self.addCleanup(cleanup_fds)
2696 r, w = os.pipe()
2697 fds += r, w
2698 self._check_warn_on_dealloc(r, *args, **kwargs)
2699 # When using closefd=False, there's no warning
2700 r, w = os.pipe()
2701 fds += r, w
2702 with warnings.catch_warnings(record=True) as recorded:
2703 open(r, *args, closefd=False, **kwargs)
2704 support.gc_collect()
2705 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002706
2707 def test_warn_on_dealloc_fd(self):
2708 self._check_warn_on_dealloc_fd("rb", buffering=0)
2709 self._check_warn_on_dealloc_fd("rb")
2710 self._check_warn_on_dealloc_fd("r")
2711
2712
Antoine Pitrou243757e2010-11-05 21:15:39 +00002713 def test_pickling(self):
2714 # Pickling file objects is forbidden
2715 for kwargs in [
2716 {"mode": "w"},
2717 {"mode": "wb"},
2718 {"mode": "wb", "buffering": 0},
2719 {"mode": "r"},
2720 {"mode": "rb"},
2721 {"mode": "rb", "buffering": 0},
2722 {"mode": "w+"},
2723 {"mode": "w+b"},
2724 {"mode": "w+b", "buffering": 0},
2725 ]:
2726 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2727 with self.open(support.TESTFN, **kwargs) as f:
2728 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730class CMiscIOTest(MiscIOTest):
2731 io = io
2732
2733class PyMiscIOTest(MiscIOTest):
2734 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002735
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002736
2737@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2738class SignalsTest(unittest.TestCase):
2739
2740 def setUp(self):
2741 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2742
2743 def tearDown(self):
2744 signal.signal(signal.SIGALRM, self.oldalrm)
2745
2746 def alarm_interrupt(self, sig, frame):
2747 1/0
2748
2749 @unittest.skipUnless(threading, 'Threading required for this test.')
2750 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2751 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002752 invokes the signal handler, and bubbles up the exception raised
2753 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002754 read_results = []
2755 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002756 if hasattr(signal, 'pthread_sigmask'):
2757 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002758 s = os.read(r, 1)
2759 read_results.append(s)
2760 t = threading.Thread(target=_read)
2761 t.daemon = True
2762 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002763 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002764 try:
2765 wio = self.io.open(w, **fdopen_kwargs)
2766 t.start()
2767 signal.alarm(1)
2768 # Fill the pipe enough that the write will be blocking.
2769 # It will be interrupted by the timer armed above. Since the
2770 # other thread has read one byte, the low-level write will
2771 # return with a successful (partial) result rather than an EINTR.
2772 # The buffered IO layer must check for pending signal
2773 # handlers, which in this case will invoke alarm_interrupt().
2774 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002775 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002776 t.join()
2777 # We got one byte, get another one and check that it isn't a
2778 # repeat of the first one.
2779 read_results.append(os.read(r, 1))
2780 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2781 finally:
2782 os.close(w)
2783 os.close(r)
2784 # This is deliberate. If we didn't close the file descriptor
2785 # before closing wio, wio would try to flush its internal
2786 # buffer, and block again.
2787 try:
2788 wio.close()
2789 except IOError as e:
2790 if e.errno != errno.EBADF:
2791 raise
2792
2793 def test_interrupted_write_unbuffered(self):
2794 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2795
2796 def test_interrupted_write_buffered(self):
2797 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2798
2799 def test_interrupted_write_text(self):
2800 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2801
Brett Cannon31f59292011-02-21 19:29:56 +00002802 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002803 def check_reentrant_write(self, data, **fdopen_kwargs):
2804 def on_alarm(*args):
2805 # Will be called reentrantly from the same thread
2806 wio.write(data)
2807 1/0
2808 signal.signal(signal.SIGALRM, on_alarm)
2809 r, w = os.pipe()
2810 wio = self.io.open(w, **fdopen_kwargs)
2811 try:
2812 signal.alarm(1)
2813 # Either the reentrant call to wio.write() fails with RuntimeError,
2814 # or the signal handler raises ZeroDivisionError.
2815 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2816 while 1:
2817 for i in range(100):
2818 wio.write(data)
2819 wio.flush()
2820 # Make sure the buffer doesn't fill up and block further writes
2821 os.read(r, len(data) * 100)
2822 exc = cm.exception
2823 if isinstance(exc, RuntimeError):
2824 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2825 finally:
2826 wio.close()
2827 os.close(r)
2828
2829 def test_reentrant_write_buffered(self):
2830 self.check_reentrant_write(b"xy", mode="wb")
2831
2832 def test_reentrant_write_text(self):
2833 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2834
Antoine Pitrou707ce822011-02-25 21:24:11 +00002835 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2836 """Check that a buffered read, when it gets interrupted (either
2837 returning a partial result or EINTR), properly invokes the signal
2838 handler and retries if the latter returned successfully."""
2839 r, w = os.pipe()
2840 fdopen_kwargs["closefd"] = False
2841 def alarm_handler(sig, frame):
2842 os.write(w, b"bar")
2843 signal.signal(signal.SIGALRM, alarm_handler)
2844 try:
2845 rio = self.io.open(r, **fdopen_kwargs)
2846 os.write(w, b"foo")
2847 signal.alarm(1)
2848 # Expected behaviour:
2849 # - first raw read() returns partial b"foo"
2850 # - second raw read() returns EINTR
2851 # - third raw read() returns b"bar"
2852 self.assertEqual(decode(rio.read(6)), "foobar")
2853 finally:
2854 rio.close()
2855 os.close(w)
2856 os.close(r)
2857
Antoine Pitrou20db5112011-08-19 20:32:34 +02002858 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002859 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2860 mode="rb")
2861
Antoine Pitrou20db5112011-08-19 20:32:34 +02002862 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002863 self.check_interrupted_read_retry(lambda x: x,
2864 mode="r")
2865
2866 @unittest.skipUnless(threading, 'Threading required for this test.')
2867 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2868 """Check that a buffered write, when it gets interrupted (either
2869 returning a partial result or EINTR), properly invokes the signal
2870 handler and retries if the latter returned successfully."""
2871 select = support.import_module("select")
2872 # A quantity that exceeds the buffer size of an anonymous pipe's
2873 # write end.
2874 N = 1024 * 1024
2875 r, w = os.pipe()
2876 fdopen_kwargs["closefd"] = False
2877 # We need a separate thread to read from the pipe and allow the
2878 # write() to finish. This thread is started after the SIGALRM is
2879 # received (forcing a first EINTR in write()).
2880 read_results = []
2881 write_finished = False
2882 def _read():
2883 while not write_finished:
2884 while r in select.select([r], [], [], 1.0)[0]:
2885 s = os.read(r, 1024)
2886 read_results.append(s)
2887 t = threading.Thread(target=_read)
2888 t.daemon = True
2889 def alarm1(sig, frame):
2890 signal.signal(signal.SIGALRM, alarm2)
2891 signal.alarm(1)
2892 def alarm2(sig, frame):
2893 t.start()
2894 signal.signal(signal.SIGALRM, alarm1)
2895 try:
2896 wio = self.io.open(w, **fdopen_kwargs)
2897 signal.alarm(1)
2898 # Expected behaviour:
2899 # - first raw write() is partial (because of the limited pipe buffer
2900 # and the first alarm)
2901 # - second raw write() returns EINTR (because of the second alarm)
2902 # - subsequent write()s are successful (either partial or complete)
2903 self.assertEqual(N, wio.write(item * N))
2904 wio.flush()
2905 write_finished = True
2906 t.join()
2907 self.assertEqual(N, sum(len(x) for x in read_results))
2908 finally:
2909 write_finished = True
2910 os.close(w)
2911 os.close(r)
2912 # This is deliberate. If we didn't close the file descriptor
2913 # before closing wio, wio would try to flush its internal
2914 # buffer, and could block (in case of failure).
2915 try:
2916 wio.close()
2917 except IOError as e:
2918 if e.errno != errno.EBADF:
2919 raise
2920
Antoine Pitrou20db5112011-08-19 20:32:34 +02002921 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002922 self.check_interrupted_write_retry(b"x", mode="wb")
2923
Antoine Pitrou20db5112011-08-19 20:32:34 +02002924 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002925 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2926
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002927
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002928class CSignalsTest(SignalsTest):
2929 io = io
2930
2931class PySignalsTest(SignalsTest):
2932 io = pyio
2933
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002934 # Handling reentrancy issues would slow down _pyio even more, so the
2935 # tests are disabled.
2936 test_reentrant_write_buffered = None
2937 test_reentrant_write_text = None
2938
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002939
Guido van Rossum28524c72007-02-27 05:47:44 +00002940def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941 tests = (CIOTest, PyIOTest,
2942 CBufferedReaderTest, PyBufferedReaderTest,
2943 CBufferedWriterTest, PyBufferedWriterTest,
2944 CBufferedRWPairTest, PyBufferedRWPairTest,
2945 CBufferedRandomTest, PyBufferedRandomTest,
2946 StatefulIncrementalDecoderTest,
2947 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2948 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002949 CMiscIOTest, PyMiscIOTest,
2950 CSignalsTest, PySignalsTest,
2951 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002952
2953 # Put the namespaces of the IO module we are testing and some useful mock
2954 # classes in the __dict__ of each test.
2955 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002956 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002957 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2958 c_io_ns = {name : getattr(io, name) for name in all_members}
2959 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2960 globs = globals()
2961 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2962 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2963 # Avoid turning open into a bound method.
2964 py_io_ns["open"] = pyio.OpenWrapper
2965 for test in tests:
2966 if test.__name__.startswith("C"):
2967 for name, obj in c_io_ns.items():
2968 setattr(test, name, obj)
2969 elif test.__name__.startswith("Py"):
2970 for name, obj in py_io_ns.items():
2971 setattr(test, name, obj)
2972
2973 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002974
2975if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002976 test_main()