blob: f5bb732ad9c4d8cfda466c9a7556d4667a2265fe [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000052 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Ross Lagerwall59142db2011-10-31 20:34:46 +0200637 def test_opener(self):
638 with self.open(support.TESTFN, "w") as f:
639 f.write("egg\n")
640 fd = os.open(support.TESTFN, os.O_RDONLY)
641 def opener(path, flags):
642 return fd
643 with self.open("non-existent", "r", opener=opener) as f:
644 self.assertEqual(f.read(), "egg\n")
645
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200646 def test_fileio_closefd(self):
647 # Issue #4841
648 with self.open(__file__, 'rb') as f1, \
649 self.open(__file__, 'rb') as f2:
650 fileio = self.FileIO(f1.fileno(), closefd=False)
651 # .__init__() must not close f1
652 fileio.__init__(f2.fileno(), closefd=False)
653 f1.readline()
654 # .close() must not close f2
655 fileio.close()
656 f2.readline()
657
658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200660
661 def test_IOBase_finalize(self):
662 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
663 # class which inherits IOBase and an object of this class are caught
664 # in a reference cycle and close() is already in the method cache.
665 class MyIO(self.IOBase):
666 def close(self):
667 pass
668
669 # create an instance to populate the method cache
670 MyIO()
671 obj = MyIO()
672 obj.obj = obj
673 wr = weakref.ref(obj)
674 del MyIO
675 del obj
676 support.gc_collect()
677 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000679class PyIOTest(IOTest):
680 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000681
Guido van Rossuma9e20242007-03-08 00:43:48 +0000682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683class CommonBufferedTests:
684 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
685
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000686 def test_detach(self):
687 raw = self.MockRawIO()
688 buf = self.tp(raw)
689 self.assertIs(buf.detach(), raw)
690 self.assertRaises(ValueError, buf.detach)
691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000692 def test_fileno(self):
693 rawio = self.MockRawIO()
694 bufio = self.tp(rawio)
695
Ezio Melottib3aedd42010-11-20 19:04:17 +0000696 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697
698 def test_no_fileno(self):
699 # XXX will we always have fileno() function? If so, kill
700 # this test. Else, write it.
701 pass
702
703 def test_invalid_args(self):
704 rawio = self.MockRawIO()
705 bufio = self.tp(rawio)
706 # Invalid whence
707 self.assertRaises(ValueError, bufio.seek, 0, -1)
708 self.assertRaises(ValueError, bufio.seek, 0, 3)
709
710 def test_override_destructor(self):
711 tp = self.tp
712 record = []
713 class MyBufferedIO(tp):
714 def __del__(self):
715 record.append(1)
716 try:
717 f = super().__del__
718 except AttributeError:
719 pass
720 else:
721 f()
722 def close(self):
723 record.append(2)
724 super().close()
725 def flush(self):
726 record.append(3)
727 super().flush()
728 rawio = self.MockRawIO()
729 bufio = MyBufferedIO(rawio)
730 writable = bufio.writable()
731 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000732 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 if writable:
734 self.assertEqual(record, [1, 2, 3])
735 else:
736 self.assertEqual(record, [1, 2])
737
738 def test_context_manager(self):
739 # Test usability as a context manager
740 rawio = self.MockRawIO()
741 bufio = self.tp(rawio)
742 def _with():
743 with bufio:
744 pass
745 _with()
746 # bufio should now be closed, and using it a second time should raise
747 # a ValueError.
748 self.assertRaises(ValueError, _with)
749
750 def test_error_through_destructor(self):
751 # Test that the exception state is not modified by a destructor,
752 # even if close() fails.
753 rawio = self.CloseFailureIO()
754 def f():
755 self.tp(rawio).xyzzy
756 with support.captured_output("stderr") as s:
757 self.assertRaises(AttributeError, f)
758 s = s.getvalue().strip()
759 if s:
760 # The destructor *may* have printed an unraisable error, check it
761 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000762 self.assertTrue(s.startswith("Exception IOError: "), s)
763 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000764
Antoine Pitrou716c4442009-05-23 19:04:03 +0000765 def test_repr(self):
766 raw = self.MockRawIO()
767 b = self.tp(raw)
768 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
769 self.assertEqual(repr(b), "<%s>" % clsname)
770 raw.name = "dummy"
771 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
772 raw.name = b"dummy"
773 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
774
Antoine Pitrou6be88762010-05-03 16:48:20 +0000775 def test_flush_error_on_close(self):
776 raw = self.MockRawIO()
777 def bad_flush():
778 raise IOError()
779 raw.flush = bad_flush
780 b = self.tp(raw)
781 self.assertRaises(IOError, b.close) # exception not swallowed
782
783 def test_multi_close(self):
784 raw = self.MockRawIO()
785 b = self.tp(raw)
786 b.close()
787 b.close()
788 b.close()
789 self.assertRaises(ValueError, b.flush)
790
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000791 def test_unseekable(self):
792 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
793 self.assertRaises(self.UnsupportedOperation, bufio.tell)
794 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
795
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000796 def test_readonly_attributes(self):
797 raw = self.MockRawIO()
798 buf = self.tp(raw)
799 x = self.MockRawIO()
800 with self.assertRaises(AttributeError):
801 buf.raw = x
802
Guido van Rossum78892e42007-04-06 17:31:18 +0000803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000804class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
805 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 def test_constructor(self):
808 rawio = self.MockRawIO([b"abc"])
809 bufio = self.tp(rawio)
810 bufio.__init__(rawio)
811 bufio.__init__(rawio, buffer_size=1024)
812 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000813 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
815 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
816 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
817 rawio = self.MockRawIO([b"abc"])
818 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000819 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000821 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000822 for arg in (None, 7):
823 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
824 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000825 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 # Invalid args
827 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 def test_read1(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000832 self.assertEqual(b"a", bufio.read(1))
833 self.assertEqual(b"b", bufio.read1(1))
834 self.assertEqual(rawio._reads, 1)
835 self.assertEqual(b"c", bufio.read1(100))
836 self.assertEqual(rawio._reads, 1)
837 self.assertEqual(b"d", bufio.read1(100))
838 self.assertEqual(rawio._reads, 2)
839 self.assertEqual(b"efg", bufio.read1(100))
840 self.assertEqual(rawio._reads, 3)
841 self.assertEqual(b"", bufio.read1(100))
842 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 # Invalid args
844 self.assertRaises(ValueError, bufio.read1, -1)
845
846 def test_readinto(self):
847 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
848 bufio = self.tp(rawio)
849 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000850 self.assertEqual(bufio.readinto(b), 2)
851 self.assertEqual(b, b"ab")
852 self.assertEqual(bufio.readinto(b), 2)
853 self.assertEqual(b, b"cd")
854 self.assertEqual(bufio.readinto(b), 2)
855 self.assertEqual(b, b"ef")
856 self.assertEqual(bufio.readinto(b), 1)
857 self.assertEqual(b, b"gf")
858 self.assertEqual(bufio.readinto(b), 0)
859 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200860 rawio = self.MockRawIO((b"abc", None))
861 bufio = self.tp(rawio)
862 self.assertEqual(bufio.readinto(b), 2)
863 self.assertEqual(b, b"ab")
864 self.assertEqual(bufio.readinto(b), 1)
865 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000867 def test_readlines(self):
868 def bufio():
869 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
870 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000871 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
872 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
873 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000876 data = b"abcdefghi"
877 dlen = len(data)
878
879 tests = [
880 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
881 [ 100, [ 3, 3, 3], [ dlen ] ],
882 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
883 ]
884
885 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 rawio = self.MockFileIO(data)
887 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000888 pos = 0
889 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000891 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000893 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000896 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
898 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000899 self.assertEqual(b"abcd", bufio.read(6))
900 self.assertEqual(b"e", bufio.read(1))
901 self.assertEqual(b"fg", bufio.read())
902 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200903 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Victor Stinnera80987f2011-05-25 22:47:16 +0200906 rawio = self.MockRawIO((b"a", None, None))
907 self.assertEqual(b"a", rawio.readall())
908 self.assertIsNone(rawio.readall())
909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_read_past_eof(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000913
Ezio Melottib3aedd42010-11-20 19:04:17 +0000914 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_read_all(self):
917 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
918 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000919
Ezio Melottib3aedd42010-11-20 19:04:17 +0000920 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000921
Victor Stinner45df8202010-04-28 22:31:17 +0000922 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000923 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000925 try:
926 # Write out many bytes with exactly the same number of 0's,
927 # 1's... 255's. This will help us check that concurrent reading
928 # doesn't duplicate or forget contents.
929 N = 1000
930 l = list(range(256)) * N
931 random.shuffle(l)
932 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000933 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000934 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000935 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000937 errors = []
938 results = []
939 def f():
940 try:
941 # Intra-buffer read then buffer-flushing read
942 for n in cycle([1, 19]):
943 s = bufio.read(n)
944 if not s:
945 break
946 # list.append() is atomic
947 results.append(s)
948 except Exception as e:
949 errors.append(e)
950 raise
951 threads = [threading.Thread(target=f) for x in range(20)]
952 for t in threads:
953 t.start()
954 time.sleep(0.02) # yield
955 for t in threads:
956 t.join()
957 self.assertFalse(errors,
958 "the following exceptions were caught: %r" % errors)
959 s = b''.join(results)
960 for i in range(256):
961 c = bytes(bytearray([i]))
962 self.assertEqual(s.count(c), N)
963 finally:
964 support.unlink(support.TESTFN)
965
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200966 def test_unseekable(self):
967 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
968 self.assertRaises(self.UnsupportedOperation, bufio.tell)
969 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
970 bufio.read(1)
971 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
972 self.assertRaises(self.UnsupportedOperation, bufio.tell)
973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_misbehaved_io(self):
975 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
976 bufio = self.tp(rawio)
977 self.assertRaises(IOError, bufio.seek, 0)
978 self.assertRaises(IOError, bufio.tell)
979
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000980 def test_no_extraneous_read(self):
981 # Issue #9550; when the raw IO object has satisfied the read request,
982 # we should not issue any additional reads, otherwise it may block
983 # (e.g. socket).
984 bufsize = 16
985 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
986 rawio = self.MockRawIO([b"x" * n])
987 bufio = self.tp(rawio, bufsize)
988 self.assertEqual(bufio.read(n), b"x" * n)
989 # Simple case: one raw read is enough to satisfy the request.
990 self.assertEqual(rawio._extraneous_reads, 0,
991 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
992 # A more complex case where two raw reads are needed to satisfy
993 # the request.
994 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
995 bufio = self.tp(rawio, bufsize)
996 self.assertEqual(bufio.read(n), b"x" * n)
997 self.assertEqual(rawio._extraneous_reads, 0,
998 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
999
1000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001class CBufferedReaderTest(BufferedReaderTest):
1002 tp = io.BufferedReader
1003
1004 def test_constructor(self):
1005 BufferedReaderTest.test_constructor(self)
1006 # The allocation can succeed on 32-bit builds, e.g. with more
1007 # than 2GB RAM and a 64-bit kernel.
1008 if sys.maxsize > 0x7FFFFFFF:
1009 rawio = self.MockRawIO()
1010 bufio = self.tp(rawio)
1011 self.assertRaises((OverflowError, MemoryError, ValueError),
1012 bufio.__init__, rawio, sys.maxsize)
1013
1014 def test_initialization(self):
1015 rawio = self.MockRawIO([b"abc"])
1016 bufio = self.tp(rawio)
1017 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1018 self.assertRaises(ValueError, bufio.read)
1019 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1020 self.assertRaises(ValueError, bufio.read)
1021 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1022 self.assertRaises(ValueError, bufio.read)
1023
1024 def test_misbehaved_io_read(self):
1025 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1026 bufio = self.tp(rawio)
1027 # _pyio.BufferedReader seems to implement reading different, so that
1028 # checking this is not so easy.
1029 self.assertRaises(IOError, bufio.read, 10)
1030
1031 def test_garbage_collection(self):
1032 # C BufferedReader objects are collected.
1033 # The Python version has __del__, so it ends into gc.garbage instead
1034 rawio = self.FileIO(support.TESTFN, "w+b")
1035 f = self.tp(rawio)
1036 f.f = f
1037 wr = weakref.ref(f)
1038 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001039 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001040 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041
1042class PyBufferedReaderTest(BufferedReaderTest):
1043 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001044
Guido van Rossuma9e20242007-03-08 00:43:48 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1047 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 def test_constructor(self):
1050 rawio = self.MockRawIO()
1051 bufio = self.tp(rawio)
1052 bufio.__init__(rawio)
1053 bufio.__init__(rawio, buffer_size=1024)
1054 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001055 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 bufio.flush()
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1060 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001061 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001063 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001065 def test_detach_flush(self):
1066 raw = self.MockRawIO()
1067 buf = self.tp(raw)
1068 buf.write(b"howdy!")
1069 self.assertFalse(raw._write_stack)
1070 buf.detach()
1071 self.assertEqual(raw._write_stack, [b"howdy!"])
1072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001074 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075 writer = self.MockRawIO()
1076 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001077 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001078 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def test_write_overflow(self):
1081 writer = self.MockRawIO()
1082 bufio = self.tp(writer, 8)
1083 contents = b"abcdefghijklmnop"
1084 for n in range(0, len(contents), 3):
1085 bufio.write(contents[n:n+3])
1086 flushed = b"".join(writer._write_stack)
1087 # At least (total - 8) bytes were implicitly flushed, perhaps more
1088 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001089 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 def check_writes(self, intermediate_func):
1092 # Lots of writes, test the flushed output is as expected.
1093 contents = bytes(range(256)) * 1000
1094 n = 0
1095 writer = self.MockRawIO()
1096 bufio = self.tp(writer, 13)
1097 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1098 def gen_sizes():
1099 for size in count(1):
1100 for i in range(15):
1101 yield size
1102 sizes = gen_sizes()
1103 while n < len(contents):
1104 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 intermediate_func(bufio)
1107 n += size
1108 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 def test_writes(self):
1112 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_writes_and_flushes(self):
1115 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 def test_writes_and_seeks(self):
1118 def _seekabs(bufio):
1119 pos = bufio.tell()
1120 bufio.seek(pos + 1, 0)
1121 bufio.seek(pos - 1, 0)
1122 bufio.seek(pos, 0)
1123 self.check_writes(_seekabs)
1124 def _seekrel(bufio):
1125 pos = bufio.seek(0, 1)
1126 bufio.seek(+1, 1)
1127 bufio.seek(-1, 1)
1128 bufio.seek(pos, 0)
1129 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def test_writes_and_truncates(self):
1132 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 def test_write_non_blocking(self):
1135 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001136 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001137
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(bufio.write(b"abcd"), 4)
1139 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 # 1 byte will be written, the rest will be buffered
1141 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1145 raw.block_on(b"0")
1146 try:
1147 bufio.write(b"opqrwxyz0123456789")
1148 except self.BlockingIOError as e:
1149 written = e.characters_written
1150 else:
1151 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(written, 16)
1153 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001155
Ezio Melottib3aedd42010-11-20 19:04:17 +00001156 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 s = raw.pop_written()
1158 # Previously buffered bytes were flushed
1159 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_write_and_rewind(self):
1162 raw = io.BytesIO()
1163 bufio = self.tp(raw, 4)
1164 self.assertEqual(bufio.write(b"abcdef"), 6)
1165 self.assertEqual(bufio.tell(), 6)
1166 bufio.seek(0, 0)
1167 self.assertEqual(bufio.write(b"XY"), 2)
1168 bufio.seek(6, 0)
1169 self.assertEqual(raw.getvalue(), b"XYcdef")
1170 self.assertEqual(bufio.write(b"123456"), 6)
1171 bufio.flush()
1172 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 def test_flush(self):
1175 writer = self.MockRawIO()
1176 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001177 bufio.write(b"abc")
1178 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 def test_destructor(self):
1182 writer = self.MockRawIO()
1183 bufio = self.tp(writer, 8)
1184 bufio.write(b"abc")
1185 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001186 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001187 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188
1189 def test_truncate(self):
1190 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001191 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192 bufio = self.tp(raw, 8)
1193 bufio.write(b"abcdef")
1194 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001195 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001196 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197 self.assertEqual(f.read(), b"abc")
1198
Victor Stinner45df8202010-04-28 22:31:17 +00001199 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001200 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001202 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 # Write out many bytes from many threads and test they were
1204 # all flushed.
1205 N = 1000
1206 contents = bytes(range(256)) * N
1207 sizes = cycle([1, 19])
1208 n = 0
1209 queue = deque()
1210 while n < len(contents):
1211 size = next(sizes)
1212 queue.append(contents[n:n+size])
1213 n += size
1214 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001215 # We use a real file object because it allows us to
1216 # exercise situations where the GIL is released before
1217 # writing the buffer to the raw streams. This is in addition
1218 # to concurrency issues due to switching threads in the middle
1219 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001220 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001222 errors = []
1223 def f():
1224 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 while True:
1226 try:
1227 s = queue.popleft()
1228 except IndexError:
1229 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001230 bufio.write(s)
1231 except Exception as e:
1232 errors.append(e)
1233 raise
1234 threads = [threading.Thread(target=f) for x in range(20)]
1235 for t in threads:
1236 t.start()
1237 time.sleep(0.02) # yield
1238 for t in threads:
1239 t.join()
1240 self.assertFalse(errors,
1241 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001243 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 s = f.read()
1245 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001246 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001247 finally:
1248 support.unlink(support.TESTFN)
1249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_misbehaved_io(self):
1251 rawio = self.MisbehavedRawIO()
1252 bufio = self.tp(rawio, 5)
1253 self.assertRaises(IOError, bufio.seek, 0)
1254 self.assertRaises(IOError, bufio.tell)
1255 self.assertRaises(IOError, bufio.write, b"abcdef")
1256
Benjamin Peterson59406a92009-03-26 17:10:29 +00001257 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001258 with support.check_warnings(("max_buffer_size is deprecated",
1259 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001260 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001261
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263class CBufferedWriterTest(BufferedWriterTest):
1264 tp = io.BufferedWriter
1265
1266 def test_constructor(self):
1267 BufferedWriterTest.test_constructor(self)
1268 # The allocation can succeed on 32-bit builds, e.g. with more
1269 # than 2GB RAM and a 64-bit kernel.
1270 if sys.maxsize > 0x7FFFFFFF:
1271 rawio = self.MockRawIO()
1272 bufio = self.tp(rawio)
1273 self.assertRaises((OverflowError, MemoryError, ValueError),
1274 bufio.__init__, rawio, sys.maxsize)
1275
1276 def test_initialization(self):
1277 rawio = self.MockRawIO()
1278 bufio = self.tp(rawio)
1279 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1280 self.assertRaises(ValueError, bufio.write, b"def")
1281 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1282 self.assertRaises(ValueError, bufio.write, b"def")
1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1284 self.assertRaises(ValueError, bufio.write, b"def")
1285
1286 def test_garbage_collection(self):
1287 # C BufferedWriter objects are collected, and collecting them flushes
1288 # all data to disk.
1289 # The Python version has __del__, so it ends into gc.garbage instead
1290 rawio = self.FileIO(support.TESTFN, "w+b")
1291 f = self.tp(rawio)
1292 f.write(b"123xxx")
1293 f.x = f
1294 wr = weakref.ref(f)
1295 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001296 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001297 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001298 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 self.assertEqual(f.read(), b"123xxx")
1300
1301
1302class PyBufferedWriterTest(BufferedWriterTest):
1303 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001304
Guido van Rossum01a27522007-03-07 01:00:12 +00001305class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001306
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001307 def test_constructor(self):
1308 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001309 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001310
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001311 def test_detach(self):
1312 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1313 self.assertRaises(self.UnsupportedOperation, pair.detach)
1314
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001315 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001316 with support.check_warnings(("max_buffer_size is deprecated",
1317 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001318 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001319
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001320 def test_constructor_with_not_readable(self):
1321 class NotReadable(MockRawIO):
1322 def readable(self):
1323 return False
1324
1325 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1326
1327 def test_constructor_with_not_writeable(self):
1328 class NotWriteable(MockRawIO):
1329 def writable(self):
1330 return False
1331
1332 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1333
1334 def test_read(self):
1335 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1336
1337 self.assertEqual(pair.read(3), b"abc")
1338 self.assertEqual(pair.read(1), b"d")
1339 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001340 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1341 self.assertEqual(pair.read(None), b"abc")
1342
1343 def test_readlines(self):
1344 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1345 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1346 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1347 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001348
1349 def test_read1(self):
1350 # .read1() is delegated to the underlying reader object, so this test
1351 # can be shallow.
1352 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1353
1354 self.assertEqual(pair.read1(3), b"abc")
1355
1356 def test_readinto(self):
1357 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1358
1359 data = bytearray(5)
1360 self.assertEqual(pair.readinto(data), 5)
1361 self.assertEqual(data, b"abcde")
1362
1363 def test_write(self):
1364 w = self.MockRawIO()
1365 pair = self.tp(self.MockRawIO(), w)
1366
1367 pair.write(b"abc")
1368 pair.flush()
1369 pair.write(b"def")
1370 pair.flush()
1371 self.assertEqual(w._write_stack, [b"abc", b"def"])
1372
1373 def test_peek(self):
1374 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1375
1376 self.assertTrue(pair.peek(3).startswith(b"abc"))
1377 self.assertEqual(pair.read(3), b"abc")
1378
1379 def test_readable(self):
1380 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1381 self.assertTrue(pair.readable())
1382
1383 def test_writeable(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1385 self.assertTrue(pair.writable())
1386
1387 def test_seekable(self):
1388 # BufferedRWPairs are never seekable, even if their readers and writers
1389 # are.
1390 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1391 self.assertFalse(pair.seekable())
1392
1393 # .flush() is delegated to the underlying writer object and has been
1394 # tested in the test_write method.
1395
1396 def test_close_and_closed(self):
1397 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1398 self.assertFalse(pair.closed)
1399 pair.close()
1400 self.assertTrue(pair.closed)
1401
1402 def test_isatty(self):
1403 class SelectableIsAtty(MockRawIO):
1404 def __init__(self, isatty):
1405 MockRawIO.__init__(self)
1406 self._isatty = isatty
1407
1408 def isatty(self):
1409 return self._isatty
1410
1411 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1412 self.assertFalse(pair.isatty())
1413
1414 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1415 self.assertTrue(pair.isatty())
1416
1417 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1418 self.assertTrue(pair.isatty())
1419
1420 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1421 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423class CBufferedRWPairTest(BufferedRWPairTest):
1424 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001425
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426class PyBufferedRWPairTest(BufferedRWPairTest):
1427 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429
1430class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1431 read_mode = "rb+"
1432 write_mode = "wb+"
1433
1434 def test_constructor(self):
1435 BufferedReaderTest.test_constructor(self)
1436 BufferedWriterTest.test_constructor(self)
1437
1438 def test_read_and_write(self):
1439 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001440 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001441
1442 self.assertEqual(b"as", rw.read(2))
1443 rw.write(b"ddd")
1444 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001445 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001447 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 def test_seek_and_tell(self):
1450 raw = self.BytesIO(b"asdfghjkl")
1451 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001452
Ezio Melottib3aedd42010-11-20 19:04:17 +00001453 self.assertEqual(b"as", rw.read(2))
1454 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001455 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001456 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001457
Antoine Pitroue05565e2011-08-20 14:39:23 +02001458 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001459 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001460 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001461 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001462 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001463 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001464 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001465 self.assertEqual(7, rw.tell())
1466 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001467 rw.flush()
1468 self.assertEqual(b"asdf123fl", raw.getvalue())
1469
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001470 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 def check_flush_and_read(self, read_func):
1473 raw = self.BytesIO(b"abcdefghi")
1474 bufio = self.tp(raw)
1475
Ezio Melottib3aedd42010-11-20 19:04:17 +00001476 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001478 self.assertEqual(b"ef", read_func(bufio, 2))
1479 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001480 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(6, bufio.tell())
1482 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001483 raw.seek(0, 0)
1484 raw.write(b"XYZ")
1485 # flush() resets the read buffer
1486 bufio.flush()
1487 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001488 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489
1490 def test_flush_and_read(self):
1491 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1492
1493 def test_flush_and_readinto(self):
1494 def _readinto(bufio, n=-1):
1495 b = bytearray(n if n >= 0 else 9999)
1496 n = bufio.readinto(b)
1497 return bytes(b[:n])
1498 self.check_flush_and_read(_readinto)
1499
1500 def test_flush_and_peek(self):
1501 def _peek(bufio, n=-1):
1502 # This relies on the fact that the buffer can contain the whole
1503 # raw stream, otherwise peek() can return less.
1504 b = bufio.peek(n)
1505 if n != -1:
1506 b = b[:n]
1507 bufio.seek(len(b), 1)
1508 return b
1509 self.check_flush_and_read(_peek)
1510
1511 def test_flush_and_write(self):
1512 raw = self.BytesIO(b"abcdefghi")
1513 bufio = self.tp(raw)
1514
1515 bufio.write(b"123")
1516 bufio.flush()
1517 bufio.write(b"45")
1518 bufio.flush()
1519 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001520 self.assertEqual(b"12345fghi", raw.getvalue())
1521 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001522
1523 def test_threads(self):
1524 BufferedReaderTest.test_threads(self)
1525 BufferedWriterTest.test_threads(self)
1526
1527 def test_writes_and_peek(self):
1528 def _peek(bufio):
1529 bufio.peek(1)
1530 self.check_writes(_peek)
1531 def _peek(bufio):
1532 pos = bufio.tell()
1533 bufio.seek(-1, 1)
1534 bufio.peek(1)
1535 bufio.seek(pos, 0)
1536 self.check_writes(_peek)
1537
1538 def test_writes_and_reads(self):
1539 def _read(bufio):
1540 bufio.seek(-1, 1)
1541 bufio.read(1)
1542 self.check_writes(_read)
1543
1544 def test_writes_and_read1s(self):
1545 def _read1(bufio):
1546 bufio.seek(-1, 1)
1547 bufio.read1(1)
1548 self.check_writes(_read1)
1549
1550 def test_writes_and_readintos(self):
1551 def _read(bufio):
1552 bufio.seek(-1, 1)
1553 bufio.readinto(bytearray(1))
1554 self.check_writes(_read)
1555
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001556 def test_write_after_readahead(self):
1557 # Issue #6629: writing after the buffer was filled by readahead should
1558 # first rewind the raw stream.
1559 for overwrite_size in [1, 5]:
1560 raw = self.BytesIO(b"A" * 10)
1561 bufio = self.tp(raw, 4)
1562 # Trigger readahead
1563 self.assertEqual(bufio.read(1), b"A")
1564 self.assertEqual(bufio.tell(), 1)
1565 # Overwriting should rewind the raw stream if it needs so
1566 bufio.write(b"B" * overwrite_size)
1567 self.assertEqual(bufio.tell(), overwrite_size + 1)
1568 # If the write size was smaller than the buffer size, flush() and
1569 # check that rewind happens.
1570 bufio.flush()
1571 self.assertEqual(bufio.tell(), overwrite_size + 1)
1572 s = raw.getvalue()
1573 self.assertEqual(s,
1574 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1575
Antoine Pitrou7c404892011-05-13 00:13:33 +02001576 def test_write_rewind_write(self):
1577 # Various combinations of reading / writing / seeking backwards / writing again
1578 def mutate(bufio, pos1, pos2):
1579 assert pos2 >= pos1
1580 # Fill the buffer
1581 bufio.seek(pos1)
1582 bufio.read(pos2 - pos1)
1583 bufio.write(b'\x02')
1584 # This writes earlier than the previous write, but still inside
1585 # the buffer.
1586 bufio.seek(pos1)
1587 bufio.write(b'\x01')
1588
1589 b = b"\x80\x81\x82\x83\x84"
1590 for i in range(0, len(b)):
1591 for j in range(i, len(b)):
1592 raw = self.BytesIO(b)
1593 bufio = self.tp(raw, 100)
1594 mutate(bufio, i, j)
1595 bufio.flush()
1596 expected = bytearray(b)
1597 expected[j] = 2
1598 expected[i] = 1
1599 self.assertEqual(raw.getvalue(), expected,
1600 "failed result for i=%d, j=%d" % (i, j))
1601
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001602 def test_truncate_after_read_or_write(self):
1603 raw = self.BytesIO(b"A" * 10)
1604 bufio = self.tp(raw, 100)
1605 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1606 self.assertEqual(bufio.truncate(), 2)
1607 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1608 self.assertEqual(bufio.truncate(), 4)
1609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 def test_misbehaved_io(self):
1611 BufferedReaderTest.test_misbehaved_io(self)
1612 BufferedWriterTest.test_misbehaved_io(self)
1613
Antoine Pitroue05565e2011-08-20 14:39:23 +02001614 def test_interleaved_read_write(self):
1615 # Test for issue #12213
1616 with self.BytesIO(b'abcdefgh') as raw:
1617 with self.tp(raw, 100) as f:
1618 f.write(b"1")
1619 self.assertEqual(f.read(1), b'b')
1620 f.write(b'2')
1621 self.assertEqual(f.read1(1), b'd')
1622 f.write(b'3')
1623 buf = bytearray(1)
1624 f.readinto(buf)
1625 self.assertEqual(buf, b'f')
1626 f.write(b'4')
1627 self.assertEqual(f.peek(1), b'h')
1628 f.flush()
1629 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1630
1631 with self.BytesIO(b'abc') as raw:
1632 with self.tp(raw, 100) as f:
1633 self.assertEqual(f.read(1), b'a')
1634 f.write(b"2")
1635 self.assertEqual(f.read(1), b'c')
1636 f.flush()
1637 self.assertEqual(raw.getvalue(), b'a2c')
1638
1639 def test_interleaved_readline_write(self):
1640 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1641 with self.tp(raw) as f:
1642 f.write(b'1')
1643 self.assertEqual(f.readline(), b'b\n')
1644 f.write(b'2')
1645 self.assertEqual(f.readline(), b'def\n')
1646 f.write(b'3')
1647 self.assertEqual(f.readline(), b'\n')
1648 f.flush()
1649 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1650
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001651 # You can't construct a BufferedRandom over a non-seekable stream.
1652 test_unseekable = None
1653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654class CBufferedRandomTest(BufferedRandomTest):
1655 tp = io.BufferedRandom
1656
1657 def test_constructor(self):
1658 BufferedRandomTest.test_constructor(self)
1659 # The allocation can succeed on 32-bit builds, e.g. with more
1660 # than 2GB RAM and a 64-bit kernel.
1661 if sys.maxsize > 0x7FFFFFFF:
1662 rawio = self.MockRawIO()
1663 bufio = self.tp(rawio)
1664 self.assertRaises((OverflowError, MemoryError, ValueError),
1665 bufio.__init__, rawio, sys.maxsize)
1666
1667 def test_garbage_collection(self):
1668 CBufferedReaderTest.test_garbage_collection(self)
1669 CBufferedWriterTest.test_garbage_collection(self)
1670
1671class PyBufferedRandomTest(BufferedRandomTest):
1672 tp = pyio.BufferedRandom
1673
1674
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001675# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1676# properties:
1677# - A single output character can correspond to many bytes of input.
1678# - The number of input bytes to complete the character can be
1679# undetermined until the last input byte is received.
1680# - The number of input bytes can vary depending on previous input.
1681# - A single input byte can correspond to many characters of output.
1682# - The number of output characters can be undetermined until the
1683# last input byte is received.
1684# - The number of output characters can vary depending on previous input.
1685
1686class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1687 """
1688 For testing seek/tell behavior with a stateful, buffering decoder.
1689
1690 Input is a sequence of words. Words may be fixed-length (length set
1691 by input) or variable-length (period-terminated). In variable-length
1692 mode, extra periods are ignored. Possible words are:
1693 - 'i' followed by a number sets the input length, I (maximum 99).
1694 When I is set to 0, words are space-terminated.
1695 - 'o' followed by a number sets the output length, O (maximum 99).
1696 - Any other word is converted into a word followed by a period on
1697 the output. The output word consists of the input word truncated
1698 or padded out with hyphens to make its length equal to O. If O
1699 is 0, the word is output verbatim without truncating or padding.
1700 I and O are initially set to 1. When I changes, any buffered input is
1701 re-scanned according to the new I. EOF also terminates the last word.
1702 """
1703
1704 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001705 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001706 self.reset()
1707
1708 def __repr__(self):
1709 return '<SID %x>' % id(self)
1710
1711 def reset(self):
1712 self.i = 1
1713 self.o = 1
1714 self.buffer = bytearray()
1715
1716 def getstate(self):
1717 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1718 return bytes(self.buffer), i*100 + o
1719
1720 def setstate(self, state):
1721 buffer, io = state
1722 self.buffer = bytearray(buffer)
1723 i, o = divmod(io, 100)
1724 self.i, self.o = i ^ 1, o ^ 1
1725
1726 def decode(self, input, final=False):
1727 output = ''
1728 for b in input:
1729 if self.i == 0: # variable-length, terminated with period
1730 if b == ord('.'):
1731 if self.buffer:
1732 output += self.process_word()
1733 else:
1734 self.buffer.append(b)
1735 else: # fixed-length, terminate after self.i bytes
1736 self.buffer.append(b)
1737 if len(self.buffer) == self.i:
1738 output += self.process_word()
1739 if final and self.buffer: # EOF terminates the last word
1740 output += self.process_word()
1741 return output
1742
1743 def process_word(self):
1744 output = ''
1745 if self.buffer[0] == ord('i'):
1746 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1747 elif self.buffer[0] == ord('o'):
1748 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1749 else:
1750 output = self.buffer.decode('ascii')
1751 if len(output) < self.o:
1752 output += '-'*self.o # pad out with hyphens
1753 if self.o:
1754 output = output[:self.o] # truncate to output length
1755 output += '.'
1756 self.buffer = bytearray()
1757 return output
1758
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001759 codecEnabled = False
1760
1761 @classmethod
1762 def lookupTestDecoder(cls, name):
1763 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001764 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001765 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001766 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001767 incrementalencoder=None,
1768 streamreader=None, streamwriter=None,
1769 incrementaldecoder=cls)
1770
1771# Register the previous decoder for testing.
1772# Disabled by default, tests will enable it.
1773codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1774
1775
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001776class StatefulIncrementalDecoderTest(unittest.TestCase):
1777 """
1778 Make sure the StatefulIncrementalDecoder actually works.
1779 """
1780
1781 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001782 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001783 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001784 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001785 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001786 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001787 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001788 # I=0, O=6 (variable-length input, fixed-length output)
1789 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1790 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001791 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001792 # I=6, O=3 (fixed-length input > fixed-length output)
1793 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1794 # I=0, then 3; O=29, then 15 (with longer output)
1795 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1796 'a----------------------------.' +
1797 'b----------------------------.' +
1798 'cde--------------------------.' +
1799 'abcdefghijabcde.' +
1800 'a.b------------.' +
1801 '.c.------------.' +
1802 'd.e------------.' +
1803 'k--------------.' +
1804 'l--------------.' +
1805 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001806 ]
1807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001809 # Try a few one-shot test cases.
1810 for input, eof, output in self.test_cases:
1811 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001813
1814 # Also test an unfinished decode, followed by forcing EOF.
1815 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001816 self.assertEqual(d.decode(b'oiabcd'), '')
1817 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001818
1819class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001820
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001821 def setUp(self):
1822 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1823 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001824 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001825
Guido van Rossumd0712812007-04-11 16:32:43 +00001826 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001827 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 def test_constructor(self):
1830 r = self.BytesIO(b"\xc3\xa9\n\n")
1831 b = self.BufferedReader(r, 1000)
1832 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001833 t.__init__(b, encoding="latin-1", newline="\r\n")
1834 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001836 t.__init__(b, encoding="utf-8", line_buffering=True)
1837 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001838 self.assertEqual(t.line_buffering, True)
1839 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001840 self.assertRaises(TypeError, t.__init__, b, newline=42)
1841 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1842
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001843 def test_detach(self):
1844 r = self.BytesIO()
1845 b = self.BufferedWriter(r)
1846 t = self.TextIOWrapper(b)
1847 self.assertIs(t.detach(), b)
1848
1849 t = self.TextIOWrapper(b, encoding="ascii")
1850 t.write("howdy")
1851 self.assertFalse(r.getvalue())
1852 t.detach()
1853 self.assertEqual(r.getvalue(), b"howdy")
1854 self.assertRaises(ValueError, t.detach)
1855
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001856 def test_repr(self):
1857 raw = self.BytesIO("hello".encode("utf-8"))
1858 b = self.BufferedReader(raw)
1859 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001860 modname = self.TextIOWrapper.__module__
1861 self.assertEqual(repr(t),
1862 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1863 raw.name = "dummy"
1864 self.assertEqual(repr(t),
1865 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001866 t.mode = "r"
1867 self.assertEqual(repr(t),
1868 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001869 raw.name = b"dummy"
1870 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001871 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 def test_line_buffering(self):
1874 r = self.BytesIO()
1875 b = self.BufferedWriter(r, 1000)
1876 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001877 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001878 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001879 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001880 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001881 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001882 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 def test_encoding(self):
1885 # Check the encoding attribute is always set, and valid
1886 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001887 t = self.TextIOWrapper(b, encoding="utf-8")
1888 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001890 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891 codecs.lookup(t.encoding)
1892
1893 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001894 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 b = self.BytesIO(b"abc\n\xff\n")
1896 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001897 self.assertRaises(UnicodeError, t.read)
1898 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899 b = self.BytesIO(b"abc\n\xff\n")
1900 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001901 self.assertRaises(UnicodeError, t.read)
1902 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 b = self.BytesIO(b"abc\n\xff\n")
1904 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001905 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001906 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 b = self.BytesIO(b"abc\n\xff\n")
1908 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001909 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001912 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 b = self.BytesIO()
1914 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001915 self.assertRaises(UnicodeError, t.write, "\xff")
1916 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 b = self.BytesIO()
1918 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001919 self.assertRaises(UnicodeError, t.write, "\xff")
1920 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 b = self.BytesIO()
1922 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001923 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001924 t.write("abc\xffdef\n")
1925 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001926 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001927 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 b = self.BytesIO()
1929 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001930 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001931 t.write("abc\xffdef\n")
1932 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001933 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001936 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1937
1938 tests = [
1939 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001940 [ '', input_lines ],
1941 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1942 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1943 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001944 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001945 encodings = (
1946 'utf-8', 'latin-1',
1947 'utf-16', 'utf-16-le', 'utf-16-be',
1948 'utf-32', 'utf-32-le', 'utf-32-be',
1949 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001950
Guido van Rossum8358db22007-08-18 21:39:55 +00001951 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001952 # character in TextIOWrapper._pending_line.
1953 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001954 # XXX: str.encode() should return bytes
1955 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001956 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001957 for bufsize in range(1, 10):
1958 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1960 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001961 encoding=encoding)
1962 if do_reads:
1963 got_lines = []
1964 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001965 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001966 if c2 == '':
1967 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001968 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001969 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001970 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001971 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001972
1973 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(got_line, exp_line)
1975 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 def test_newlines_input(self):
1978 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001979 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1980 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001981 (None, normalized.decode("ascii").splitlines(keepends=True)),
1982 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001983 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1984 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1985 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001986 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 buf = self.BytesIO(testdata)
1988 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001989 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001990 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001991 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_newlines_output(self):
1994 testdict = {
1995 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1996 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1997 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1998 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1999 }
2000 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2001 for newline, expected in tests:
2002 buf = self.BytesIO()
2003 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2004 txt.write("AAA\nB")
2005 txt.write("BB\nCCC\n")
2006 txt.write("X\rY\r\nZ")
2007 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual(buf.closed, False)
2009 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010
2011 def test_destructor(self):
2012 l = []
2013 base = self.BytesIO
2014 class MyBytesIO(base):
2015 def close(self):
2016 l.append(self.getvalue())
2017 base.close(self)
2018 b = MyBytesIO()
2019 t = self.TextIOWrapper(b, encoding="ascii")
2020 t.write("abc")
2021 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002022 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024
2025 def test_override_destructor(self):
2026 record = []
2027 class MyTextIO(self.TextIOWrapper):
2028 def __del__(self):
2029 record.append(1)
2030 try:
2031 f = super().__del__
2032 except AttributeError:
2033 pass
2034 else:
2035 f()
2036 def close(self):
2037 record.append(2)
2038 super().close()
2039 def flush(self):
2040 record.append(3)
2041 super().flush()
2042 b = self.BytesIO()
2043 t = MyTextIO(b, encoding="ascii")
2044 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002045 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 self.assertEqual(record, [1, 2, 3])
2047
2048 def test_error_through_destructor(self):
2049 # Test that the exception state is not modified by a destructor,
2050 # even if close() fails.
2051 rawio = self.CloseFailureIO()
2052 def f():
2053 self.TextIOWrapper(rawio).xyzzy
2054 with support.captured_output("stderr") as s:
2055 self.assertRaises(AttributeError, f)
2056 s = s.getvalue().strip()
2057 if s:
2058 # The destructor *may* have printed an unraisable error, check it
2059 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002060 self.assertTrue(s.startswith("Exception IOError: "), s)
2061 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002062
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 # Systematic tests of the text I/O API
2064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002065 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002066 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002067 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002068 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002069 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002071 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002072 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002073 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002074 self.assertEqual(f.tell(), 0)
2075 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002076 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002077 self.assertEqual(f.seek(0), 0)
2078 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002079 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002080 self.assertEqual(f.read(2), "ab")
2081 self.assertEqual(f.read(1), "c")
2082 self.assertEqual(f.read(1), "")
2083 self.assertEqual(f.read(), "")
2084 self.assertEqual(f.tell(), cookie)
2085 self.assertEqual(f.seek(0), 0)
2086 self.assertEqual(f.seek(0, 2), cookie)
2087 self.assertEqual(f.write("def"), 3)
2088 self.assertEqual(f.seek(cookie), cookie)
2089 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002090 if enc.startswith("utf"):
2091 self.multi_line_test(f, enc)
2092 f.close()
2093
2094 def multi_line_test(self, f, enc):
2095 f.seek(0)
2096 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002097 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002098 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002099 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002100 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002101 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002102 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002103 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002104 wlines.append((f.tell(), line))
2105 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002106 f.seek(0)
2107 rlines = []
2108 while True:
2109 pos = f.tell()
2110 line = f.readline()
2111 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002112 break
2113 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002117 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002118 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002119 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002120 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002121 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002122 p2 = f.tell()
2123 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(f.tell(), p0)
2125 self.assertEqual(f.readline(), "\xff\n")
2126 self.assertEqual(f.tell(), p1)
2127 self.assertEqual(f.readline(), "\xff\n")
2128 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002129 f.seek(0)
2130 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002132 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002133 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002134 f.close()
2135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 def test_seeking(self):
2137 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002138 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002139 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002140 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002141 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002142 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002143 suffix = bytes(u_suffix.encode("utf-8"))
2144 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002145 with self.open(support.TESTFN, "wb") as f:
2146 f.write(line*2)
2147 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2148 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002149 self.assertEqual(s, str(prefix, "ascii"))
2150 self.assertEqual(f.tell(), prefix_size)
2151 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002154 # Regression test for a specific bug
2155 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002156 with self.open(support.TESTFN, "wb") as f:
2157 f.write(data)
2158 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2159 f._CHUNK_SIZE # Just test that it exists
2160 f._CHUNK_SIZE = 2
2161 f.readline()
2162 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 def test_seek_and_tell(self):
2165 #Test seek/tell using the StatefulIncrementalDecoder.
2166 # Make test faster by doing smaller seeks
2167 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002168
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002169 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002170 """Tell/seek to various points within a data stream and ensure
2171 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002173 f.write(data)
2174 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 f = self.open(support.TESTFN, encoding='test_decoder')
2176 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002177 decoded = f.read()
2178 f.close()
2179
Neal Norwitze2b07052008-03-18 19:52:05 +00002180 for i in range(min_pos, len(decoded) + 1): # seek positions
2181 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002183 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002184 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002186 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002187 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002188 f.close()
2189
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002190 # Enable the test decoder.
2191 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002192
2193 # Run the tests.
2194 try:
2195 # Try each test case.
2196 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002197 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002198
2199 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002200 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2201 offset = CHUNK_SIZE - len(input)//2
2202 prefix = b'.'*offset
2203 # Don't bother seeking into the prefix (takes too long).
2204 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002205 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002206
2207 # Ensure our test decoder won't interfere with subsequent tests.
2208 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002209 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002210
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002212 data = "1234567890"
2213 tests = ("utf-16",
2214 "utf-16-le",
2215 "utf-16-be",
2216 "utf-32",
2217 "utf-32-le",
2218 "utf-32-be")
2219 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 buf = self.BytesIO()
2221 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002222 # Check if the BOM is written only once (see issue1753).
2223 f.write(data)
2224 f.write(data)
2225 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002227 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002228 self.assertEqual(f.read(), data * 2)
2229 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002230
Benjamin Petersona1b49012009-03-31 23:11:32 +00002231 def test_unreadable(self):
2232 class UnReadable(self.BytesIO):
2233 def readable(self):
2234 return False
2235 txt = self.TextIOWrapper(UnReadable())
2236 self.assertRaises(IOError, txt.read)
2237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 def test_read_one_by_one(self):
2239 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002240 reads = ""
2241 while True:
2242 c = txt.read(1)
2243 if not c:
2244 break
2245 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002246 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002247
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002248 def test_readlines(self):
2249 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2250 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2251 txt.seek(0)
2252 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2253 txt.seek(0)
2254 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2255
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002258 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002260 reads = ""
2261 while True:
2262 c = txt.read(128)
2263 if not c:
2264 break
2265 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267
2268 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002270
2271 # read one char at a time
2272 reads = ""
2273 while True:
2274 c = txt.read(1)
2275 if not c:
2276 break
2277 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002278 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279
2280 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002282 txt._CHUNK_SIZE = 4
2283
2284 reads = ""
2285 while True:
2286 c = txt.read(4)
2287 if not c:
2288 break
2289 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002290 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002291
2292 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002293 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002294 txt._CHUNK_SIZE = 4
2295
2296 reads = txt.read(4)
2297 reads += txt.read(4)
2298 reads += txt.readline()
2299 reads += txt.readline()
2300 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002302
2303 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002305 txt._CHUNK_SIZE = 4
2306
2307 reads = txt.read(4)
2308 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002309 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002310
2311 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002312 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002313 txt._CHUNK_SIZE = 4
2314
2315 reads = txt.read(4)
2316 pos = txt.tell()
2317 txt.seek(0)
2318 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002319 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002320
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002321 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322 buffer = self.BytesIO(self.testdata)
2323 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002324
2325 self.assertEqual(buffer.seekable(), txt.seekable())
2326
Antoine Pitroue4501852009-05-14 18:55:55 +00002327 def test_append_bom(self):
2328 # The BOM is not written again when appending to a non-empty file
2329 filename = support.TESTFN
2330 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2331 with self.open(filename, 'w', encoding=charset) as f:
2332 f.write('aaa')
2333 pos = f.tell()
2334 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002335 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002336
2337 with self.open(filename, 'a', encoding=charset) as f:
2338 f.write('xxx')
2339 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002340 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002341
2342 def test_seek_bom(self):
2343 # Same test, but when seeking manually
2344 filename = support.TESTFN
2345 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2346 with self.open(filename, 'w', encoding=charset) as f:
2347 f.write('aaa')
2348 pos = f.tell()
2349 with self.open(filename, 'r+', encoding=charset) as f:
2350 f.seek(pos)
2351 f.write('zzz')
2352 f.seek(0)
2353 f.write('bbb')
2354 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002355 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002356
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002357 def test_errors_property(self):
2358 with self.open(support.TESTFN, "w") as f:
2359 self.assertEqual(f.errors, "strict")
2360 with self.open(support.TESTFN, "w", errors="replace") as f:
2361 self.assertEqual(f.errors, "replace")
2362
Brett Cannon31f59292011-02-21 19:29:56 +00002363 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002364 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002365 def test_threads_write(self):
2366 # Issue6750: concurrent writes could duplicate data
2367 event = threading.Event()
2368 with self.open(support.TESTFN, "w", buffering=1) as f:
2369 def run(n):
2370 text = "Thread%03d\n" % n
2371 event.wait()
2372 f.write(text)
2373 threads = [threading.Thread(target=lambda n=x: run(n))
2374 for x in range(20)]
2375 for t in threads:
2376 t.start()
2377 time.sleep(0.02)
2378 event.set()
2379 for t in threads:
2380 t.join()
2381 with self.open(support.TESTFN) as f:
2382 content = f.read()
2383 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002385
Antoine Pitrou6be88762010-05-03 16:48:20 +00002386 def test_flush_error_on_close(self):
2387 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2388 def bad_flush():
2389 raise IOError()
2390 txt.flush = bad_flush
2391 self.assertRaises(IOError, txt.close) # exception not swallowed
2392
2393 def test_multi_close(self):
2394 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2395 txt.close()
2396 txt.close()
2397 txt.close()
2398 self.assertRaises(ValueError, txt.flush)
2399
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002400 def test_unseekable(self):
2401 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2402 self.assertRaises(self.UnsupportedOperation, txt.tell)
2403 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2404
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002405 def test_readonly_attributes(self):
2406 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2407 buf = self.BytesIO(self.testdata)
2408 with self.assertRaises(AttributeError):
2409 txt.buffer = buf
2410
Antoine Pitroue96ec682011-07-23 21:46:35 +02002411 def test_rawio(self):
2412 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2413 # that subprocess.Popen() can have the required unbuffered
2414 # semantics with universal_newlines=True.
2415 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2416 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2417 # Reads
2418 self.assertEqual(txt.read(4), 'abcd')
2419 self.assertEqual(txt.readline(), 'efghi\n')
2420 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2421
2422 def test_rawio_write_through(self):
2423 # Issue #12591: with write_through=True, writes don't need a flush
2424 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2425 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2426 write_through=True)
2427 txt.write('1')
2428 txt.write('23\n4')
2429 txt.write('5')
2430 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432class CTextIOWrapperTest(TextIOWrapperTest):
2433
2434 def test_initialization(self):
2435 r = self.BytesIO(b"\xc3\xa9\n\n")
2436 b = self.BufferedReader(r, 1000)
2437 t = self.TextIOWrapper(b)
2438 self.assertRaises(TypeError, t.__init__, b, newline=42)
2439 self.assertRaises(ValueError, t.read)
2440 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2441 self.assertRaises(ValueError, t.read)
2442
2443 def test_garbage_collection(self):
2444 # C TextIOWrapper objects are collected, and collecting them flushes
2445 # all data to disk.
2446 # The Python version has __del__, so it ends in gc.garbage instead.
2447 rawio = io.FileIO(support.TESTFN, "wb")
2448 b = self.BufferedWriter(rawio)
2449 t = self.TextIOWrapper(b, encoding="ascii")
2450 t.write("456def")
2451 t.x = t
2452 wr = weakref.ref(t)
2453 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002454 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002455 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002456 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 self.assertEqual(f.read(), b"456def")
2458
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002459 def test_rwpair_cleared_before_textio(self):
2460 # Issue 13070: TextIOWrapper's finalization would crash when called
2461 # after the reference to the underlying BufferedRWPair's writer got
2462 # cleared by the GC.
2463 for i in range(1000):
2464 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2465 t1 = self.TextIOWrapper(b1, encoding="ascii")
2466 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2467 t2 = self.TextIOWrapper(b2, encoding="ascii")
2468 # circular references
2469 t1.buddy = t2
2470 t2.buddy = t1
2471 support.gc_collect()
2472
2473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474class PyTextIOWrapperTest(TextIOWrapperTest):
2475 pass
2476
2477
2478class IncrementalNewlineDecoderTest(unittest.TestCase):
2479
2480 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002481 # UTF-8 specific tests for a newline decoder
2482 def _check_decode(b, s, **kwargs):
2483 # We exercise getstate() / setstate() as well as decode()
2484 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002485 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002486 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002488
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002490
Antoine Pitrou180a3362008-12-14 16:36:46 +00002491 _check_decode(b'\xe8', "")
2492 _check_decode(b'\xa2', "")
2493 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002494
Antoine Pitrou180a3362008-12-14 16:36:46 +00002495 _check_decode(b'\xe8', "")
2496 _check_decode(b'\xa2', "")
2497 _check_decode(b'\x88', "\u8888")
2498
2499 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002500 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2501
Antoine Pitrou180a3362008-12-14 16:36:46 +00002502 decoder.reset()
2503 _check_decode(b'\n', "\n")
2504 _check_decode(b'\r', "")
2505 _check_decode(b'', "\n", final=True)
2506 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002507
Antoine Pitrou180a3362008-12-14 16:36:46 +00002508 _check_decode(b'\r', "")
2509 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002510
Antoine Pitrou180a3362008-12-14 16:36:46 +00002511 _check_decode(b'\r\r\n', "\n\n")
2512 _check_decode(b'\r', "")
2513 _check_decode(b'\r', "\n")
2514 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002515
Antoine Pitrou180a3362008-12-14 16:36:46 +00002516 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2517 _check_decode(b'\xe8\xa2\x88', "\u8888")
2518 _check_decode(b'\n', "\n")
2519 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2520 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002523 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 if encoding is not None:
2525 encoder = codecs.getincrementalencoder(encoding)()
2526 def _decode_bytewise(s):
2527 # Decode one byte at a time
2528 for b in encoder.encode(s):
2529 result.append(decoder.decode(bytes([b])))
2530 else:
2531 encoder = None
2532 def _decode_bytewise(s):
2533 # Decode one char at a time
2534 for c in s:
2535 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002536 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002537 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002538 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002539 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002541 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002543 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002544 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002545 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002546 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002547 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 input = "abc"
2549 if encoder is not None:
2550 encoder.reset()
2551 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(decoder.decode(input), "abc")
2553 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002554
2555 def test_newline_decoder(self):
2556 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 # None meaning the IncrementalNewlineDecoder takes unicode input
2558 # rather than bytes input
2559 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002560 'utf-16', 'utf-16-le', 'utf-16-be',
2561 'utf-32', 'utf-32-le', 'utf-32-be',
2562 )
2563 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 decoder = enc and codecs.getincrementaldecoder(enc)()
2565 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2566 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002567 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2569 self.check_newline_decoding_utf8(decoder)
2570
Antoine Pitrou66913e22009-03-06 23:40:56 +00002571 def test_newline_bytes(self):
2572 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2573 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(dec.newlines, None)
2575 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2576 self.assertEqual(dec.newlines, None)
2577 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2578 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002579 dec = self.IncrementalNewlineDecoder(None, translate=False)
2580 _check(dec)
2581 dec = self.IncrementalNewlineDecoder(None, translate=True)
2582 _check(dec)
2583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002584class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2585 pass
2586
2587class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2588 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002589
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002590
Guido van Rossum01a27522007-03-07 01:00:12 +00002591# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002592
Guido van Rossum5abbf752007-08-27 17:39:33 +00002593class MiscIOTest(unittest.TestCase):
2594
Barry Warsaw40e82462008-11-20 20:14:50 +00002595 def tearDown(self):
2596 support.unlink(support.TESTFN)
2597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 def test___all__(self):
2599 for name in self.io.__all__:
2600 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002601 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002602 if name == "open":
2603 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002604 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002605 self.assertTrue(issubclass(obj, Exception), name)
2606 elif not name.startswith("SEEK_"):
2607 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002608
Barry Warsaw40e82462008-11-20 20:14:50 +00002609 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002610 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002611 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002612 f.close()
2613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002614 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002615 self.assertEqual(f.name, support.TESTFN)
2616 self.assertEqual(f.buffer.name, support.TESTFN)
2617 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2618 self.assertEqual(f.mode, "U")
2619 self.assertEqual(f.buffer.mode, "rb")
2620 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002621 f.close()
2622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002623 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(f.mode, "w+")
2625 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2626 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002627
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002629 self.assertEqual(g.mode, "wb")
2630 self.assertEqual(g.raw.mode, "wb")
2631 self.assertEqual(g.name, f.fileno())
2632 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002633 f.close()
2634 g.close()
2635
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002636 def test_io_after_close(self):
2637 for kwargs in [
2638 {"mode": "w"},
2639 {"mode": "wb"},
2640 {"mode": "w", "buffering": 1},
2641 {"mode": "w", "buffering": 2},
2642 {"mode": "wb", "buffering": 0},
2643 {"mode": "r"},
2644 {"mode": "rb"},
2645 {"mode": "r", "buffering": 1},
2646 {"mode": "r", "buffering": 2},
2647 {"mode": "rb", "buffering": 0},
2648 {"mode": "w+"},
2649 {"mode": "w+b"},
2650 {"mode": "w+", "buffering": 1},
2651 {"mode": "w+", "buffering": 2},
2652 {"mode": "w+b", "buffering": 0},
2653 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002654 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002655 f.close()
2656 self.assertRaises(ValueError, f.flush)
2657 self.assertRaises(ValueError, f.fileno)
2658 self.assertRaises(ValueError, f.isatty)
2659 self.assertRaises(ValueError, f.__iter__)
2660 if hasattr(f, "peek"):
2661 self.assertRaises(ValueError, f.peek, 1)
2662 self.assertRaises(ValueError, f.read)
2663 if hasattr(f, "read1"):
2664 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002665 if hasattr(f, "readall"):
2666 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002667 if hasattr(f, "readinto"):
2668 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2669 self.assertRaises(ValueError, f.readline)
2670 self.assertRaises(ValueError, f.readlines)
2671 self.assertRaises(ValueError, f.seek, 0)
2672 self.assertRaises(ValueError, f.tell)
2673 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 self.assertRaises(ValueError, f.write,
2675 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002676 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002679 def test_blockingioerror(self):
2680 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 class C(str):
2682 pass
2683 c = C("")
2684 b = self.BlockingIOError(1, c)
2685 c.b = b
2686 b.c = c
2687 wr = weakref.ref(c)
2688 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002689 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002690 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002691
2692 def test_abcs(self):
2693 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002694 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2695 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2696 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2697 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698
2699 def _check_abc_inheritance(self, abcmodule):
2700 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002701 self.assertIsInstance(f, abcmodule.IOBase)
2702 self.assertIsInstance(f, abcmodule.RawIOBase)
2703 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2704 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002706 self.assertIsInstance(f, abcmodule.IOBase)
2707 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2708 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2709 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002710 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002711 self.assertIsInstance(f, abcmodule.IOBase)
2712 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2713 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2714 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715
2716 def test_abc_inheritance(self):
2717 # Test implementations inherit from their respective ABCs
2718 self._check_abc_inheritance(self)
2719
2720 def test_abc_inheritance_official(self):
2721 # Test implementations inherit from the official ABCs of the
2722 # baseline "io" module.
2723 self._check_abc_inheritance(io)
2724
Antoine Pitroue033e062010-10-29 10:38:18 +00002725 def _check_warn_on_dealloc(self, *args, **kwargs):
2726 f = open(*args, **kwargs)
2727 r = repr(f)
2728 with self.assertWarns(ResourceWarning) as cm:
2729 f = None
2730 support.gc_collect()
2731 self.assertIn(r, str(cm.warning.args[0]))
2732
2733 def test_warn_on_dealloc(self):
2734 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2735 self._check_warn_on_dealloc(support.TESTFN, "wb")
2736 self._check_warn_on_dealloc(support.TESTFN, "w")
2737
2738 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2739 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002740 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002741 for fd in fds:
2742 try:
2743 os.close(fd)
2744 except EnvironmentError as e:
2745 if e.errno != errno.EBADF:
2746 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002747 self.addCleanup(cleanup_fds)
2748 r, w = os.pipe()
2749 fds += r, w
2750 self._check_warn_on_dealloc(r, *args, **kwargs)
2751 # When using closefd=False, there's no warning
2752 r, w = os.pipe()
2753 fds += r, w
2754 with warnings.catch_warnings(record=True) as recorded:
2755 open(r, *args, closefd=False, **kwargs)
2756 support.gc_collect()
2757 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002758
2759 def test_warn_on_dealloc_fd(self):
2760 self._check_warn_on_dealloc_fd("rb", buffering=0)
2761 self._check_warn_on_dealloc_fd("rb")
2762 self._check_warn_on_dealloc_fd("r")
2763
2764
Antoine Pitrou243757e2010-11-05 21:15:39 +00002765 def test_pickling(self):
2766 # Pickling file objects is forbidden
2767 for kwargs in [
2768 {"mode": "w"},
2769 {"mode": "wb"},
2770 {"mode": "wb", "buffering": 0},
2771 {"mode": "r"},
2772 {"mode": "rb"},
2773 {"mode": "rb", "buffering": 0},
2774 {"mode": "w+"},
2775 {"mode": "w+b"},
2776 {"mode": "w+b", "buffering": 0},
2777 ]:
2778 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2779 with self.open(support.TESTFN, **kwargs) as f:
2780 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2781
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002782 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2783 def test_nonblock_pipe_write_bigbuf(self):
2784 self._test_nonblock_pipe_write(16*1024)
2785
2786 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2787 def test_nonblock_pipe_write_smallbuf(self):
2788 self._test_nonblock_pipe_write(1024)
2789
2790 def _set_non_blocking(self, fd):
2791 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2792 self.assertNotEqual(flags, -1)
2793 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2794 self.assertEqual(res, 0)
2795
2796 def _test_nonblock_pipe_write(self, bufsize):
2797 sent = []
2798 received = []
2799 r, w = os.pipe()
2800 self._set_non_blocking(r)
2801 self._set_non_blocking(w)
2802
2803 # To exercise all code paths in the C implementation we need
2804 # to play with buffer sizes. For instance, if we choose a
2805 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2806 # then we will never get a partial write of the buffer.
2807 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2808 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2809
2810 with rf, wf:
2811 for N in 9999, 73, 7574:
2812 try:
2813 i = 0
2814 while True:
2815 msg = bytes([i % 26 + 97]) * N
2816 sent.append(msg)
2817 wf.write(msg)
2818 i += 1
2819
2820 except self.BlockingIOError as e:
2821 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002822 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002823 sent[-1] = sent[-1][:e.characters_written]
2824 received.append(rf.read())
2825 msg = b'BLOCKED'
2826 wf.write(msg)
2827 sent.append(msg)
2828
2829 while True:
2830 try:
2831 wf.flush()
2832 break
2833 except self.BlockingIOError as e:
2834 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002835 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002836 self.assertEqual(e.characters_written, 0)
2837 received.append(rf.read())
2838
2839 received += iter(rf.read, None)
2840
2841 sent, received = b''.join(sent), b''.join(received)
2842 self.assertTrue(sent == received)
2843 self.assertTrue(wf.closed)
2844 self.assertTrue(rf.closed)
2845
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002846 def test_create_fail(self):
2847 # 'x' mode fails if file is existing
2848 with self.open(support.TESTFN, 'w'):
2849 pass
2850 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2851
2852 def test_create_writes(self):
2853 # 'x' mode opens for writing
2854 with self.open(support.TESTFN, 'xb') as f:
2855 f.write(b"spam")
2856 with self.open(support.TESTFN, 'rb') as f:
2857 self.assertEqual(b"spam", f.read())
2858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002859class CMiscIOTest(MiscIOTest):
2860 io = io
2861
2862class PyMiscIOTest(MiscIOTest):
2863 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002864
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002865
2866@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2867class SignalsTest(unittest.TestCase):
2868
2869 def setUp(self):
2870 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2871
2872 def tearDown(self):
2873 signal.signal(signal.SIGALRM, self.oldalrm)
2874
2875 def alarm_interrupt(self, sig, frame):
2876 1/0
2877
2878 @unittest.skipUnless(threading, 'Threading required for this test.')
2879 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2880 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002881 invokes the signal handler, and bubbles up the exception raised
2882 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002883 read_results = []
2884 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002885 if hasattr(signal, 'pthread_sigmask'):
2886 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002887 s = os.read(r, 1)
2888 read_results.append(s)
2889 t = threading.Thread(target=_read)
2890 t.daemon = True
2891 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002892 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002893 try:
2894 wio = self.io.open(w, **fdopen_kwargs)
2895 t.start()
2896 signal.alarm(1)
2897 # Fill the pipe enough that the write will be blocking.
2898 # It will be interrupted by the timer armed above. Since the
2899 # other thread has read one byte, the low-level write will
2900 # return with a successful (partial) result rather than an EINTR.
2901 # The buffered IO layer must check for pending signal
2902 # handlers, which in this case will invoke alarm_interrupt().
2903 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002904 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002905 t.join()
2906 # We got one byte, get another one and check that it isn't a
2907 # repeat of the first one.
2908 read_results.append(os.read(r, 1))
2909 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2910 finally:
2911 os.close(w)
2912 os.close(r)
2913 # This is deliberate. If we didn't close the file descriptor
2914 # before closing wio, wio would try to flush its internal
2915 # buffer, and block again.
2916 try:
2917 wio.close()
2918 except IOError as e:
2919 if e.errno != errno.EBADF:
2920 raise
2921
2922 def test_interrupted_write_unbuffered(self):
2923 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2924
2925 def test_interrupted_write_buffered(self):
2926 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2927
2928 def test_interrupted_write_text(self):
2929 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2930
Brett Cannon31f59292011-02-21 19:29:56 +00002931 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002932 def check_reentrant_write(self, data, **fdopen_kwargs):
2933 def on_alarm(*args):
2934 # Will be called reentrantly from the same thread
2935 wio.write(data)
2936 1/0
2937 signal.signal(signal.SIGALRM, on_alarm)
2938 r, w = os.pipe()
2939 wio = self.io.open(w, **fdopen_kwargs)
2940 try:
2941 signal.alarm(1)
2942 # Either the reentrant call to wio.write() fails with RuntimeError,
2943 # or the signal handler raises ZeroDivisionError.
2944 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2945 while 1:
2946 for i in range(100):
2947 wio.write(data)
2948 wio.flush()
2949 # Make sure the buffer doesn't fill up and block further writes
2950 os.read(r, len(data) * 100)
2951 exc = cm.exception
2952 if isinstance(exc, RuntimeError):
2953 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2954 finally:
2955 wio.close()
2956 os.close(r)
2957
2958 def test_reentrant_write_buffered(self):
2959 self.check_reentrant_write(b"xy", mode="wb")
2960
2961 def test_reentrant_write_text(self):
2962 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2963
Antoine Pitrou707ce822011-02-25 21:24:11 +00002964 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2965 """Check that a buffered read, when it gets interrupted (either
2966 returning a partial result or EINTR), properly invokes the signal
2967 handler and retries if the latter returned successfully."""
2968 r, w = os.pipe()
2969 fdopen_kwargs["closefd"] = False
2970 def alarm_handler(sig, frame):
2971 os.write(w, b"bar")
2972 signal.signal(signal.SIGALRM, alarm_handler)
2973 try:
2974 rio = self.io.open(r, **fdopen_kwargs)
2975 os.write(w, b"foo")
2976 signal.alarm(1)
2977 # Expected behaviour:
2978 # - first raw read() returns partial b"foo"
2979 # - second raw read() returns EINTR
2980 # - third raw read() returns b"bar"
2981 self.assertEqual(decode(rio.read(6)), "foobar")
2982 finally:
2983 rio.close()
2984 os.close(w)
2985 os.close(r)
2986
Antoine Pitrou20db5112011-08-19 20:32:34 +02002987 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002988 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2989 mode="rb")
2990
Antoine Pitrou20db5112011-08-19 20:32:34 +02002991 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002992 self.check_interrupted_read_retry(lambda x: x,
2993 mode="r")
2994
2995 @unittest.skipUnless(threading, 'Threading required for this test.')
2996 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2997 """Check that a buffered write, when it gets interrupted (either
2998 returning a partial result or EINTR), properly invokes the signal
2999 handler and retries if the latter returned successfully."""
3000 select = support.import_module("select")
3001 # A quantity that exceeds the buffer size of an anonymous pipe's
3002 # write end.
3003 N = 1024 * 1024
3004 r, w = os.pipe()
3005 fdopen_kwargs["closefd"] = False
3006 # We need a separate thread to read from the pipe and allow the
3007 # write() to finish. This thread is started after the SIGALRM is
3008 # received (forcing a first EINTR in write()).
3009 read_results = []
3010 write_finished = False
3011 def _read():
3012 while not write_finished:
3013 while r in select.select([r], [], [], 1.0)[0]:
3014 s = os.read(r, 1024)
3015 read_results.append(s)
3016 t = threading.Thread(target=_read)
3017 t.daemon = True
3018 def alarm1(sig, frame):
3019 signal.signal(signal.SIGALRM, alarm2)
3020 signal.alarm(1)
3021 def alarm2(sig, frame):
3022 t.start()
3023 signal.signal(signal.SIGALRM, alarm1)
3024 try:
3025 wio = self.io.open(w, **fdopen_kwargs)
3026 signal.alarm(1)
3027 # Expected behaviour:
3028 # - first raw write() is partial (because of the limited pipe buffer
3029 # and the first alarm)
3030 # - second raw write() returns EINTR (because of the second alarm)
3031 # - subsequent write()s are successful (either partial or complete)
3032 self.assertEqual(N, wio.write(item * N))
3033 wio.flush()
3034 write_finished = True
3035 t.join()
3036 self.assertEqual(N, sum(len(x) for x in read_results))
3037 finally:
3038 write_finished = True
3039 os.close(w)
3040 os.close(r)
3041 # This is deliberate. If we didn't close the file descriptor
3042 # before closing wio, wio would try to flush its internal
3043 # buffer, and could block (in case of failure).
3044 try:
3045 wio.close()
3046 except IOError as e:
3047 if e.errno != errno.EBADF:
3048 raise
3049
Antoine Pitrou20db5112011-08-19 20:32:34 +02003050 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003051 self.check_interrupted_write_retry(b"x", mode="wb")
3052
Antoine Pitrou20db5112011-08-19 20:32:34 +02003053 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003054 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3055
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003056
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003057class CSignalsTest(SignalsTest):
3058 io = io
3059
3060class PySignalsTest(SignalsTest):
3061 io = pyio
3062
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003063 # Handling reentrancy issues would slow down _pyio even more, so the
3064 # tests are disabled.
3065 test_reentrant_write_buffered = None
3066 test_reentrant_write_text = None
3067
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003068
Guido van Rossum28524c72007-02-27 05:47:44 +00003069def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003070 tests = (CIOTest, PyIOTest,
3071 CBufferedReaderTest, PyBufferedReaderTest,
3072 CBufferedWriterTest, PyBufferedWriterTest,
3073 CBufferedRWPairTest, PyBufferedRWPairTest,
3074 CBufferedRandomTest, PyBufferedRandomTest,
3075 StatefulIncrementalDecoderTest,
3076 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3077 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003078 CMiscIOTest, PyMiscIOTest,
3079 CSignalsTest, PySignalsTest,
3080 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003081
3082 # Put the namespaces of the IO module we are testing and some useful mock
3083 # classes in the __dict__ of each test.
3084 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003085 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003086 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3087 c_io_ns = {name : getattr(io, name) for name in all_members}
3088 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3089 globs = globals()
3090 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3091 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3092 # Avoid turning open into a bound method.
3093 py_io_ns["open"] = pyio.OpenWrapper
3094 for test in tests:
3095 if test.__name__.startswith("C"):
3096 for name, obj in c_io_ns.items():
3097 setattr(test, name, obj)
3098 elif test.__name__.startswith("Py"):
3099 for name, obj in py_io_ns.items():
3100 setattr(test, name, obj)
3101
3102 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003103
3104if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003105 test_main()