blob: 96258b4ccd7aba2001bb02c48d85f7f1619d05e7 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000052 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Ross Lagerwall59142db2011-10-31 20:34:46 +0200637 def test_opener(self):
638 with self.open(support.TESTFN, "w") as f:
639 f.write("egg\n")
640 fd = os.open(support.TESTFN, os.O_RDONLY)
641 def opener(path, flags):
642 return fd
643 with self.open("non-existent", "r", opener=opener) as f:
644 self.assertEqual(f.read(), "egg\n")
645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200647
648 def test_IOBase_finalize(self):
649 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
650 # class which inherits IOBase and an object of this class are caught
651 # in a reference cycle and close() is already in the method cache.
652 class MyIO(self.IOBase):
653 def close(self):
654 pass
655
656 # create an instance to populate the method cache
657 MyIO()
658 obj = MyIO()
659 obj.obj = obj
660 wr = weakref.ref(obj)
661 del MyIO
662 del obj
663 support.gc_collect()
664 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666class PyIOTest(IOTest):
667 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000668
Guido van Rossuma9e20242007-03-08 00:43:48 +0000669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670class CommonBufferedTests:
671 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
672
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000673 def test_detach(self):
674 raw = self.MockRawIO()
675 buf = self.tp(raw)
676 self.assertIs(buf.detach(), raw)
677 self.assertRaises(ValueError, buf.detach)
678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000679 def test_fileno(self):
680 rawio = self.MockRawIO()
681 bufio = self.tp(rawio)
682
Ezio Melottib3aedd42010-11-20 19:04:17 +0000683 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684
685 def test_no_fileno(self):
686 # XXX will we always have fileno() function? If so, kill
687 # this test. Else, write it.
688 pass
689
690 def test_invalid_args(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693 # Invalid whence
694 self.assertRaises(ValueError, bufio.seek, 0, -1)
695 self.assertRaises(ValueError, bufio.seek, 0, 3)
696
697 def test_override_destructor(self):
698 tp = self.tp
699 record = []
700 class MyBufferedIO(tp):
701 def __del__(self):
702 record.append(1)
703 try:
704 f = super().__del__
705 except AttributeError:
706 pass
707 else:
708 f()
709 def close(self):
710 record.append(2)
711 super().close()
712 def flush(self):
713 record.append(3)
714 super().flush()
715 rawio = self.MockRawIO()
716 bufio = MyBufferedIO(rawio)
717 writable = bufio.writable()
718 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000719 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000720 if writable:
721 self.assertEqual(record, [1, 2, 3])
722 else:
723 self.assertEqual(record, [1, 2])
724
725 def test_context_manager(self):
726 # Test usability as a context manager
727 rawio = self.MockRawIO()
728 bufio = self.tp(rawio)
729 def _with():
730 with bufio:
731 pass
732 _with()
733 # bufio should now be closed, and using it a second time should raise
734 # a ValueError.
735 self.assertRaises(ValueError, _with)
736
737 def test_error_through_destructor(self):
738 # Test that the exception state is not modified by a destructor,
739 # even if close() fails.
740 rawio = self.CloseFailureIO()
741 def f():
742 self.tp(rawio).xyzzy
743 with support.captured_output("stderr") as s:
744 self.assertRaises(AttributeError, f)
745 s = s.getvalue().strip()
746 if s:
747 # The destructor *may* have printed an unraisable error, check it
748 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000749 self.assertTrue(s.startswith("Exception IOError: "), s)
750 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000751
Antoine Pitrou716c4442009-05-23 19:04:03 +0000752 def test_repr(self):
753 raw = self.MockRawIO()
754 b = self.tp(raw)
755 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
756 self.assertEqual(repr(b), "<%s>" % clsname)
757 raw.name = "dummy"
758 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
759 raw.name = b"dummy"
760 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
761
Antoine Pitrou6be88762010-05-03 16:48:20 +0000762 def test_flush_error_on_close(self):
763 raw = self.MockRawIO()
764 def bad_flush():
765 raise IOError()
766 raw.flush = bad_flush
767 b = self.tp(raw)
768 self.assertRaises(IOError, b.close) # exception not swallowed
769
770 def test_multi_close(self):
771 raw = self.MockRawIO()
772 b = self.tp(raw)
773 b.close()
774 b.close()
775 b.close()
776 self.assertRaises(ValueError, b.flush)
777
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000778 def test_unseekable(self):
779 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
780 self.assertRaises(self.UnsupportedOperation, bufio.tell)
781 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
782
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000783 def test_readonly_attributes(self):
784 raw = self.MockRawIO()
785 buf = self.tp(raw)
786 x = self.MockRawIO()
787 with self.assertRaises(AttributeError):
788 buf.raw = x
789
Guido van Rossum78892e42007-04-06 17:31:18 +0000790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
792 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_constructor(self):
795 rawio = self.MockRawIO([b"abc"])
796 bufio = self.tp(rawio)
797 bufio.__init__(rawio)
798 bufio.__init__(rawio, buffer_size=1024)
799 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000800 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000801 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
802 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
803 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
804 rawio = self.MockRawIO([b"abc"])
805 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000809 for arg in (None, 7):
810 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000812 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000813 # Invalid args
814 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 def test_read1(self):
817 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
818 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000819 self.assertEqual(b"a", bufio.read(1))
820 self.assertEqual(b"b", bufio.read1(1))
821 self.assertEqual(rawio._reads, 1)
822 self.assertEqual(b"c", bufio.read1(100))
823 self.assertEqual(rawio._reads, 1)
824 self.assertEqual(b"d", bufio.read1(100))
825 self.assertEqual(rawio._reads, 2)
826 self.assertEqual(b"efg", bufio.read1(100))
827 self.assertEqual(rawio._reads, 3)
828 self.assertEqual(b"", bufio.read1(100))
829 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000830 # Invalid args
831 self.assertRaises(ValueError, bufio.read1, -1)
832
833 def test_readinto(self):
834 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
835 bufio = self.tp(rawio)
836 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000837 self.assertEqual(bufio.readinto(b), 2)
838 self.assertEqual(b, b"ab")
839 self.assertEqual(bufio.readinto(b), 2)
840 self.assertEqual(b, b"cd")
841 self.assertEqual(bufio.readinto(b), 2)
842 self.assertEqual(b, b"ef")
843 self.assertEqual(bufio.readinto(b), 1)
844 self.assertEqual(b, b"gf")
845 self.assertEqual(bufio.readinto(b), 0)
846 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200847 rawio = self.MockRawIO((b"abc", None))
848 bufio = self.tp(rawio)
849 self.assertEqual(bufio.readinto(b), 2)
850 self.assertEqual(b, b"ab")
851 self.assertEqual(bufio.readinto(b), 1)
852 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000854 def test_readlines(self):
855 def bufio():
856 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
857 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
859 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
860 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000863 data = b"abcdefghi"
864 dlen = len(data)
865
866 tests = [
867 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
868 [ 100, [ 3, 3, 3], [ dlen ] ],
869 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
870 ]
871
872 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 rawio = self.MockFileIO(data)
874 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000875 pos = 0
876 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000877 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000878 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000880 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000883 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
885 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000886 self.assertEqual(b"abcd", bufio.read(6))
887 self.assertEqual(b"e", bufio.read(1))
888 self.assertEqual(b"fg", bufio.read())
889 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200890 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000891 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000892
Victor Stinnera80987f2011-05-25 22:47:16 +0200893 rawio = self.MockRawIO((b"a", None, None))
894 self.assertEqual(b"a", rawio.readall())
895 self.assertIsNone(rawio.readall())
896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 def test_read_past_eof(self):
898 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
899 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000900
Ezio Melottib3aedd42010-11-20 19:04:17 +0000901 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 def test_read_all(self):
904 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
905 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000906
Ezio Melottib3aedd42010-11-20 19:04:17 +0000907 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000908
Victor Stinner45df8202010-04-28 22:31:17 +0000909 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000910 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000912 try:
913 # Write out many bytes with exactly the same number of 0's,
914 # 1's... 255's. This will help us check that concurrent reading
915 # doesn't duplicate or forget contents.
916 N = 1000
917 l = list(range(256)) * N
918 random.shuffle(l)
919 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000920 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000921 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000922 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000924 errors = []
925 results = []
926 def f():
927 try:
928 # Intra-buffer read then buffer-flushing read
929 for n in cycle([1, 19]):
930 s = bufio.read(n)
931 if not s:
932 break
933 # list.append() is atomic
934 results.append(s)
935 except Exception as e:
936 errors.append(e)
937 raise
938 threads = [threading.Thread(target=f) for x in range(20)]
939 for t in threads:
940 t.start()
941 time.sleep(0.02) # yield
942 for t in threads:
943 t.join()
944 self.assertFalse(errors,
945 "the following exceptions were caught: %r" % errors)
946 s = b''.join(results)
947 for i in range(256):
948 c = bytes(bytearray([i]))
949 self.assertEqual(s.count(c), N)
950 finally:
951 support.unlink(support.TESTFN)
952
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200953 def test_unseekable(self):
954 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
955 self.assertRaises(self.UnsupportedOperation, bufio.tell)
956 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
957 bufio.read(1)
958 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
959 self.assertRaises(self.UnsupportedOperation, bufio.tell)
960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_misbehaved_io(self):
962 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
963 bufio = self.tp(rawio)
964 self.assertRaises(IOError, bufio.seek, 0)
965 self.assertRaises(IOError, bufio.tell)
966
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000967 def test_no_extraneous_read(self):
968 # Issue #9550; when the raw IO object has satisfied the read request,
969 # we should not issue any additional reads, otherwise it may block
970 # (e.g. socket).
971 bufsize = 16
972 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
973 rawio = self.MockRawIO([b"x" * n])
974 bufio = self.tp(rawio, bufsize)
975 self.assertEqual(bufio.read(n), b"x" * n)
976 # Simple case: one raw read is enough to satisfy the request.
977 self.assertEqual(rawio._extraneous_reads, 0,
978 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
979 # A more complex case where two raw reads are needed to satisfy
980 # the request.
981 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
982 bufio = self.tp(rawio, bufsize)
983 self.assertEqual(bufio.read(n), b"x" * n)
984 self.assertEqual(rawio._extraneous_reads, 0,
985 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
986
987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988class CBufferedReaderTest(BufferedReaderTest):
989 tp = io.BufferedReader
990
991 def test_constructor(self):
992 BufferedReaderTest.test_constructor(self)
993 # The allocation can succeed on 32-bit builds, e.g. with more
994 # than 2GB RAM and a 64-bit kernel.
995 if sys.maxsize > 0x7FFFFFFF:
996 rawio = self.MockRawIO()
997 bufio = self.tp(rawio)
998 self.assertRaises((OverflowError, MemoryError, ValueError),
999 bufio.__init__, rawio, sys.maxsize)
1000
1001 def test_initialization(self):
1002 rawio = self.MockRawIO([b"abc"])
1003 bufio = self.tp(rawio)
1004 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1005 self.assertRaises(ValueError, bufio.read)
1006 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1007 self.assertRaises(ValueError, bufio.read)
1008 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1009 self.assertRaises(ValueError, bufio.read)
1010
1011 def test_misbehaved_io_read(self):
1012 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1013 bufio = self.tp(rawio)
1014 # _pyio.BufferedReader seems to implement reading different, so that
1015 # checking this is not so easy.
1016 self.assertRaises(IOError, bufio.read, 10)
1017
1018 def test_garbage_collection(self):
1019 # C BufferedReader objects are collected.
1020 # The Python version has __del__, so it ends into gc.garbage instead
1021 rawio = self.FileIO(support.TESTFN, "w+b")
1022 f = self.tp(rawio)
1023 f.f = f
1024 wr = weakref.ref(f)
1025 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001026 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001027 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028
1029class PyBufferedReaderTest(BufferedReaderTest):
1030 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001031
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1034 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 def test_constructor(self):
1037 rawio = self.MockRawIO()
1038 bufio = self.tp(rawio)
1039 bufio.__init__(rawio)
1040 bufio.__init__(rawio, buffer_size=1024)
1041 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001042 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 bufio.flush()
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1045 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1046 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1047 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001048 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001050 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001051
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001052 def test_detach_flush(self):
1053 raw = self.MockRawIO()
1054 buf = self.tp(raw)
1055 buf.write(b"howdy!")
1056 self.assertFalse(raw._write_stack)
1057 buf.detach()
1058 self.assertEqual(raw._write_stack, [b"howdy!"])
1059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 writer = self.MockRawIO()
1063 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001064 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001065 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 def test_write_overflow(self):
1068 writer = self.MockRawIO()
1069 bufio = self.tp(writer, 8)
1070 contents = b"abcdefghijklmnop"
1071 for n in range(0, len(contents), 3):
1072 bufio.write(contents[n:n+3])
1073 flushed = b"".join(writer._write_stack)
1074 # At least (total - 8) bytes were implicitly flushed, perhaps more
1075 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001076 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078 def check_writes(self, intermediate_func):
1079 # Lots of writes, test the flushed output is as expected.
1080 contents = bytes(range(256)) * 1000
1081 n = 0
1082 writer = self.MockRawIO()
1083 bufio = self.tp(writer, 13)
1084 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1085 def gen_sizes():
1086 for size in count(1):
1087 for i in range(15):
1088 yield size
1089 sizes = gen_sizes()
1090 while n < len(contents):
1091 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001092 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 intermediate_func(bufio)
1094 n += size
1095 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_writes(self):
1099 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 def test_writes_and_flushes(self):
1102 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001103
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001104 def test_writes_and_seeks(self):
1105 def _seekabs(bufio):
1106 pos = bufio.tell()
1107 bufio.seek(pos + 1, 0)
1108 bufio.seek(pos - 1, 0)
1109 bufio.seek(pos, 0)
1110 self.check_writes(_seekabs)
1111 def _seekrel(bufio):
1112 pos = bufio.seek(0, 1)
1113 bufio.seek(+1, 1)
1114 bufio.seek(-1, 1)
1115 bufio.seek(pos, 0)
1116 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_writes_and_truncates(self):
1119 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_write_non_blocking(self):
1122 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001123 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001124
Ezio Melottib3aedd42010-11-20 19:04:17 +00001125 self.assertEqual(bufio.write(b"abcd"), 4)
1126 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 # 1 byte will be written, the rest will be buffered
1128 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001129 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1132 raw.block_on(b"0")
1133 try:
1134 bufio.write(b"opqrwxyz0123456789")
1135 except self.BlockingIOError as e:
1136 written = e.characters_written
1137 else:
1138 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001139 self.assertEqual(written, 16)
1140 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001142
Ezio Melottib3aedd42010-11-20 19:04:17 +00001143 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 s = raw.pop_written()
1145 # Previously buffered bytes were flushed
1146 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 def test_write_and_rewind(self):
1149 raw = io.BytesIO()
1150 bufio = self.tp(raw, 4)
1151 self.assertEqual(bufio.write(b"abcdef"), 6)
1152 self.assertEqual(bufio.tell(), 6)
1153 bufio.seek(0, 0)
1154 self.assertEqual(bufio.write(b"XY"), 2)
1155 bufio.seek(6, 0)
1156 self.assertEqual(raw.getvalue(), b"XYcdef")
1157 self.assertEqual(bufio.write(b"123456"), 6)
1158 bufio.flush()
1159 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_flush(self):
1162 writer = self.MockRawIO()
1163 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001164 bufio.write(b"abc")
1165 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001166 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 def test_destructor(self):
1169 writer = self.MockRawIO()
1170 bufio = self.tp(writer, 8)
1171 bufio.write(b"abc")
1172 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001173 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001174 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175
1176 def test_truncate(self):
1177 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001178 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 bufio = self.tp(raw, 8)
1180 bufio.write(b"abcdef")
1181 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001182 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001183 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 self.assertEqual(f.read(), b"abc")
1185
Victor Stinner45df8202010-04-28 22:31:17 +00001186 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001187 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001189 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 # Write out many bytes from many threads and test they were
1191 # all flushed.
1192 N = 1000
1193 contents = bytes(range(256)) * N
1194 sizes = cycle([1, 19])
1195 n = 0
1196 queue = deque()
1197 while n < len(contents):
1198 size = next(sizes)
1199 queue.append(contents[n:n+size])
1200 n += size
1201 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001202 # We use a real file object because it allows us to
1203 # exercise situations where the GIL is released before
1204 # writing the buffer to the raw streams. This is in addition
1205 # to concurrency issues due to switching threads in the middle
1206 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001207 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001209 errors = []
1210 def f():
1211 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212 while True:
1213 try:
1214 s = queue.popleft()
1215 except IndexError:
1216 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001217 bufio.write(s)
1218 except Exception as e:
1219 errors.append(e)
1220 raise
1221 threads = [threading.Thread(target=f) for x in range(20)]
1222 for t in threads:
1223 t.start()
1224 time.sleep(0.02) # yield
1225 for t in threads:
1226 t.join()
1227 self.assertFalse(errors,
1228 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001230 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001231 s = f.read()
1232 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001233 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001234 finally:
1235 support.unlink(support.TESTFN)
1236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_misbehaved_io(self):
1238 rawio = self.MisbehavedRawIO()
1239 bufio = self.tp(rawio, 5)
1240 self.assertRaises(IOError, bufio.seek, 0)
1241 self.assertRaises(IOError, bufio.tell)
1242 self.assertRaises(IOError, bufio.write, b"abcdef")
1243
Benjamin Peterson59406a92009-03-26 17:10:29 +00001244 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001245 with support.check_warnings(("max_buffer_size is deprecated",
1246 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001247 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001248
1249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250class CBufferedWriterTest(BufferedWriterTest):
1251 tp = io.BufferedWriter
1252
1253 def test_constructor(self):
1254 BufferedWriterTest.test_constructor(self)
1255 # The allocation can succeed on 32-bit builds, e.g. with more
1256 # than 2GB RAM and a 64-bit kernel.
1257 if sys.maxsize > 0x7FFFFFFF:
1258 rawio = self.MockRawIO()
1259 bufio = self.tp(rawio)
1260 self.assertRaises((OverflowError, MemoryError, ValueError),
1261 bufio.__init__, rawio, sys.maxsize)
1262
1263 def test_initialization(self):
1264 rawio = self.MockRawIO()
1265 bufio = self.tp(rawio)
1266 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1267 self.assertRaises(ValueError, bufio.write, b"def")
1268 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1269 self.assertRaises(ValueError, bufio.write, b"def")
1270 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1271 self.assertRaises(ValueError, bufio.write, b"def")
1272
1273 def test_garbage_collection(self):
1274 # C BufferedWriter objects are collected, and collecting them flushes
1275 # all data to disk.
1276 # The Python version has __del__, so it ends into gc.garbage instead
1277 rawio = self.FileIO(support.TESTFN, "w+b")
1278 f = self.tp(rawio)
1279 f.write(b"123xxx")
1280 f.x = f
1281 wr = weakref.ref(f)
1282 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001283 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001284 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001285 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001286 self.assertEqual(f.read(), b"123xxx")
1287
1288
1289class PyBufferedWriterTest(BufferedWriterTest):
1290 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001291
Guido van Rossum01a27522007-03-07 01:00:12 +00001292class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001293
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001294 def test_constructor(self):
1295 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001296 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001297
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001298 def test_detach(self):
1299 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1300 self.assertRaises(self.UnsupportedOperation, pair.detach)
1301
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001302 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001303 with support.check_warnings(("max_buffer_size is deprecated",
1304 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001305 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001306
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001307 def test_constructor_with_not_readable(self):
1308 class NotReadable(MockRawIO):
1309 def readable(self):
1310 return False
1311
1312 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1313
1314 def test_constructor_with_not_writeable(self):
1315 class NotWriteable(MockRawIO):
1316 def writable(self):
1317 return False
1318
1319 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1320
1321 def test_read(self):
1322 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1323
1324 self.assertEqual(pair.read(3), b"abc")
1325 self.assertEqual(pair.read(1), b"d")
1326 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001327 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1328 self.assertEqual(pair.read(None), b"abc")
1329
1330 def test_readlines(self):
1331 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1332 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1333 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1334 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001335
1336 def test_read1(self):
1337 # .read1() is delegated to the underlying reader object, so this test
1338 # can be shallow.
1339 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1340
1341 self.assertEqual(pair.read1(3), b"abc")
1342
1343 def test_readinto(self):
1344 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1345
1346 data = bytearray(5)
1347 self.assertEqual(pair.readinto(data), 5)
1348 self.assertEqual(data, b"abcde")
1349
1350 def test_write(self):
1351 w = self.MockRawIO()
1352 pair = self.tp(self.MockRawIO(), w)
1353
1354 pair.write(b"abc")
1355 pair.flush()
1356 pair.write(b"def")
1357 pair.flush()
1358 self.assertEqual(w._write_stack, [b"abc", b"def"])
1359
1360 def test_peek(self):
1361 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1362
1363 self.assertTrue(pair.peek(3).startswith(b"abc"))
1364 self.assertEqual(pair.read(3), b"abc")
1365
1366 def test_readable(self):
1367 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1368 self.assertTrue(pair.readable())
1369
1370 def test_writeable(self):
1371 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1372 self.assertTrue(pair.writable())
1373
1374 def test_seekable(self):
1375 # BufferedRWPairs are never seekable, even if their readers and writers
1376 # are.
1377 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1378 self.assertFalse(pair.seekable())
1379
1380 # .flush() is delegated to the underlying writer object and has been
1381 # tested in the test_write method.
1382
1383 def test_close_and_closed(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1385 self.assertFalse(pair.closed)
1386 pair.close()
1387 self.assertTrue(pair.closed)
1388
1389 def test_isatty(self):
1390 class SelectableIsAtty(MockRawIO):
1391 def __init__(self, isatty):
1392 MockRawIO.__init__(self)
1393 self._isatty = isatty
1394
1395 def isatty(self):
1396 return self._isatty
1397
1398 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1399 self.assertFalse(pair.isatty())
1400
1401 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1402 self.assertTrue(pair.isatty())
1403
1404 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1405 self.assertTrue(pair.isatty())
1406
1407 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1408 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001409
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410class CBufferedRWPairTest(BufferedRWPairTest):
1411 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413class PyBufferedRWPairTest(BufferedRWPairTest):
1414 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001415
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416
1417class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1418 read_mode = "rb+"
1419 write_mode = "wb+"
1420
1421 def test_constructor(self):
1422 BufferedReaderTest.test_constructor(self)
1423 BufferedWriterTest.test_constructor(self)
1424
1425 def test_read_and_write(self):
1426 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001427 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001428
1429 self.assertEqual(b"as", rw.read(2))
1430 rw.write(b"ddd")
1431 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001432 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001434 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436 def test_seek_and_tell(self):
1437 raw = self.BytesIO(b"asdfghjkl")
1438 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001439
Ezio Melottib3aedd42010-11-20 19:04:17 +00001440 self.assertEqual(b"as", rw.read(2))
1441 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001442 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Antoine Pitroue05565e2011-08-20 14:39:23 +02001445 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001446 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001447 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001449 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001450 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001451 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(7, rw.tell())
1453 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001454 rw.flush()
1455 self.assertEqual(b"asdf123fl", raw.getvalue())
1456
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001457 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001458
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 def check_flush_and_read(self, read_func):
1460 raw = self.BytesIO(b"abcdefghi")
1461 bufio = self.tp(raw)
1462
Ezio Melottib3aedd42010-11-20 19:04:17 +00001463 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001464 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001465 self.assertEqual(b"ef", read_func(bufio, 2))
1466 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001468 self.assertEqual(6, bufio.tell())
1469 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001470 raw.seek(0, 0)
1471 raw.write(b"XYZ")
1472 # flush() resets the read buffer
1473 bufio.flush()
1474 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476
1477 def test_flush_and_read(self):
1478 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1479
1480 def test_flush_and_readinto(self):
1481 def _readinto(bufio, n=-1):
1482 b = bytearray(n if n >= 0 else 9999)
1483 n = bufio.readinto(b)
1484 return bytes(b[:n])
1485 self.check_flush_and_read(_readinto)
1486
1487 def test_flush_and_peek(self):
1488 def _peek(bufio, n=-1):
1489 # This relies on the fact that the buffer can contain the whole
1490 # raw stream, otherwise peek() can return less.
1491 b = bufio.peek(n)
1492 if n != -1:
1493 b = b[:n]
1494 bufio.seek(len(b), 1)
1495 return b
1496 self.check_flush_and_read(_peek)
1497
1498 def test_flush_and_write(self):
1499 raw = self.BytesIO(b"abcdefghi")
1500 bufio = self.tp(raw)
1501
1502 bufio.write(b"123")
1503 bufio.flush()
1504 bufio.write(b"45")
1505 bufio.flush()
1506 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001507 self.assertEqual(b"12345fghi", raw.getvalue())
1508 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509
1510 def test_threads(self):
1511 BufferedReaderTest.test_threads(self)
1512 BufferedWriterTest.test_threads(self)
1513
1514 def test_writes_and_peek(self):
1515 def _peek(bufio):
1516 bufio.peek(1)
1517 self.check_writes(_peek)
1518 def _peek(bufio):
1519 pos = bufio.tell()
1520 bufio.seek(-1, 1)
1521 bufio.peek(1)
1522 bufio.seek(pos, 0)
1523 self.check_writes(_peek)
1524
1525 def test_writes_and_reads(self):
1526 def _read(bufio):
1527 bufio.seek(-1, 1)
1528 bufio.read(1)
1529 self.check_writes(_read)
1530
1531 def test_writes_and_read1s(self):
1532 def _read1(bufio):
1533 bufio.seek(-1, 1)
1534 bufio.read1(1)
1535 self.check_writes(_read1)
1536
1537 def test_writes_and_readintos(self):
1538 def _read(bufio):
1539 bufio.seek(-1, 1)
1540 bufio.readinto(bytearray(1))
1541 self.check_writes(_read)
1542
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001543 def test_write_after_readahead(self):
1544 # Issue #6629: writing after the buffer was filled by readahead should
1545 # first rewind the raw stream.
1546 for overwrite_size in [1, 5]:
1547 raw = self.BytesIO(b"A" * 10)
1548 bufio = self.tp(raw, 4)
1549 # Trigger readahead
1550 self.assertEqual(bufio.read(1), b"A")
1551 self.assertEqual(bufio.tell(), 1)
1552 # Overwriting should rewind the raw stream if it needs so
1553 bufio.write(b"B" * overwrite_size)
1554 self.assertEqual(bufio.tell(), overwrite_size + 1)
1555 # If the write size was smaller than the buffer size, flush() and
1556 # check that rewind happens.
1557 bufio.flush()
1558 self.assertEqual(bufio.tell(), overwrite_size + 1)
1559 s = raw.getvalue()
1560 self.assertEqual(s,
1561 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1562
Antoine Pitrou7c404892011-05-13 00:13:33 +02001563 def test_write_rewind_write(self):
1564 # Various combinations of reading / writing / seeking backwards / writing again
1565 def mutate(bufio, pos1, pos2):
1566 assert pos2 >= pos1
1567 # Fill the buffer
1568 bufio.seek(pos1)
1569 bufio.read(pos2 - pos1)
1570 bufio.write(b'\x02')
1571 # This writes earlier than the previous write, but still inside
1572 # the buffer.
1573 bufio.seek(pos1)
1574 bufio.write(b'\x01')
1575
1576 b = b"\x80\x81\x82\x83\x84"
1577 for i in range(0, len(b)):
1578 for j in range(i, len(b)):
1579 raw = self.BytesIO(b)
1580 bufio = self.tp(raw, 100)
1581 mutate(bufio, i, j)
1582 bufio.flush()
1583 expected = bytearray(b)
1584 expected[j] = 2
1585 expected[i] = 1
1586 self.assertEqual(raw.getvalue(), expected,
1587 "failed result for i=%d, j=%d" % (i, j))
1588
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001589 def test_truncate_after_read_or_write(self):
1590 raw = self.BytesIO(b"A" * 10)
1591 bufio = self.tp(raw, 100)
1592 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1593 self.assertEqual(bufio.truncate(), 2)
1594 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1595 self.assertEqual(bufio.truncate(), 4)
1596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 def test_misbehaved_io(self):
1598 BufferedReaderTest.test_misbehaved_io(self)
1599 BufferedWriterTest.test_misbehaved_io(self)
1600
Antoine Pitroue05565e2011-08-20 14:39:23 +02001601 def test_interleaved_read_write(self):
1602 # Test for issue #12213
1603 with self.BytesIO(b'abcdefgh') as raw:
1604 with self.tp(raw, 100) as f:
1605 f.write(b"1")
1606 self.assertEqual(f.read(1), b'b')
1607 f.write(b'2')
1608 self.assertEqual(f.read1(1), b'd')
1609 f.write(b'3')
1610 buf = bytearray(1)
1611 f.readinto(buf)
1612 self.assertEqual(buf, b'f')
1613 f.write(b'4')
1614 self.assertEqual(f.peek(1), b'h')
1615 f.flush()
1616 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1617
1618 with self.BytesIO(b'abc') as raw:
1619 with self.tp(raw, 100) as f:
1620 self.assertEqual(f.read(1), b'a')
1621 f.write(b"2")
1622 self.assertEqual(f.read(1), b'c')
1623 f.flush()
1624 self.assertEqual(raw.getvalue(), b'a2c')
1625
1626 def test_interleaved_readline_write(self):
1627 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1628 with self.tp(raw) as f:
1629 f.write(b'1')
1630 self.assertEqual(f.readline(), b'b\n')
1631 f.write(b'2')
1632 self.assertEqual(f.readline(), b'def\n')
1633 f.write(b'3')
1634 self.assertEqual(f.readline(), b'\n')
1635 f.flush()
1636 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1637
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001638 # You can't construct a BufferedRandom over a non-seekable stream.
1639 test_unseekable = None
1640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641class CBufferedRandomTest(BufferedRandomTest):
1642 tp = io.BufferedRandom
1643
1644 def test_constructor(self):
1645 BufferedRandomTest.test_constructor(self)
1646 # The allocation can succeed on 32-bit builds, e.g. with more
1647 # than 2GB RAM and a 64-bit kernel.
1648 if sys.maxsize > 0x7FFFFFFF:
1649 rawio = self.MockRawIO()
1650 bufio = self.tp(rawio)
1651 self.assertRaises((OverflowError, MemoryError, ValueError),
1652 bufio.__init__, rawio, sys.maxsize)
1653
1654 def test_garbage_collection(self):
1655 CBufferedReaderTest.test_garbage_collection(self)
1656 CBufferedWriterTest.test_garbage_collection(self)
1657
1658class PyBufferedRandomTest(BufferedRandomTest):
1659 tp = pyio.BufferedRandom
1660
1661
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001662# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1663# properties:
1664# - A single output character can correspond to many bytes of input.
1665# - The number of input bytes to complete the character can be
1666# undetermined until the last input byte is received.
1667# - The number of input bytes can vary depending on previous input.
1668# - A single input byte can correspond to many characters of output.
1669# - The number of output characters can be undetermined until the
1670# last input byte is received.
1671# - The number of output characters can vary depending on previous input.
1672
1673class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1674 """
1675 For testing seek/tell behavior with a stateful, buffering decoder.
1676
1677 Input is a sequence of words. Words may be fixed-length (length set
1678 by input) or variable-length (period-terminated). In variable-length
1679 mode, extra periods are ignored. Possible words are:
1680 - 'i' followed by a number sets the input length, I (maximum 99).
1681 When I is set to 0, words are space-terminated.
1682 - 'o' followed by a number sets the output length, O (maximum 99).
1683 - Any other word is converted into a word followed by a period on
1684 the output. The output word consists of the input word truncated
1685 or padded out with hyphens to make its length equal to O. If O
1686 is 0, the word is output verbatim without truncating or padding.
1687 I and O are initially set to 1. When I changes, any buffered input is
1688 re-scanned according to the new I. EOF also terminates the last word.
1689 """
1690
1691 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001692 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001693 self.reset()
1694
1695 def __repr__(self):
1696 return '<SID %x>' % id(self)
1697
1698 def reset(self):
1699 self.i = 1
1700 self.o = 1
1701 self.buffer = bytearray()
1702
1703 def getstate(self):
1704 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1705 return bytes(self.buffer), i*100 + o
1706
1707 def setstate(self, state):
1708 buffer, io = state
1709 self.buffer = bytearray(buffer)
1710 i, o = divmod(io, 100)
1711 self.i, self.o = i ^ 1, o ^ 1
1712
1713 def decode(self, input, final=False):
1714 output = ''
1715 for b in input:
1716 if self.i == 0: # variable-length, terminated with period
1717 if b == ord('.'):
1718 if self.buffer:
1719 output += self.process_word()
1720 else:
1721 self.buffer.append(b)
1722 else: # fixed-length, terminate after self.i bytes
1723 self.buffer.append(b)
1724 if len(self.buffer) == self.i:
1725 output += self.process_word()
1726 if final and self.buffer: # EOF terminates the last word
1727 output += self.process_word()
1728 return output
1729
1730 def process_word(self):
1731 output = ''
1732 if self.buffer[0] == ord('i'):
1733 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1734 elif self.buffer[0] == ord('o'):
1735 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1736 else:
1737 output = self.buffer.decode('ascii')
1738 if len(output) < self.o:
1739 output += '-'*self.o # pad out with hyphens
1740 if self.o:
1741 output = output[:self.o] # truncate to output length
1742 output += '.'
1743 self.buffer = bytearray()
1744 return output
1745
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001746 codecEnabled = False
1747
1748 @classmethod
1749 def lookupTestDecoder(cls, name):
1750 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001751 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001752 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001753 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001754 incrementalencoder=None,
1755 streamreader=None, streamwriter=None,
1756 incrementaldecoder=cls)
1757
1758# Register the previous decoder for testing.
1759# Disabled by default, tests will enable it.
1760codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1761
1762
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001763class StatefulIncrementalDecoderTest(unittest.TestCase):
1764 """
1765 Make sure the StatefulIncrementalDecoder actually works.
1766 """
1767
1768 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001769 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001770 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001771 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001772 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001773 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001774 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001775 # I=0, O=6 (variable-length input, fixed-length output)
1776 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1777 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001778 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001779 # I=6, O=3 (fixed-length input > fixed-length output)
1780 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1781 # I=0, then 3; O=29, then 15 (with longer output)
1782 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1783 'a----------------------------.' +
1784 'b----------------------------.' +
1785 'cde--------------------------.' +
1786 'abcdefghijabcde.' +
1787 'a.b------------.' +
1788 '.c.------------.' +
1789 'd.e------------.' +
1790 'k--------------.' +
1791 'l--------------.' +
1792 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001793 ]
1794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001796 # Try a few one-shot test cases.
1797 for input, eof, output in self.test_cases:
1798 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001799 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001800
1801 # Also test an unfinished decode, followed by forcing EOF.
1802 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001803 self.assertEqual(d.decode(b'oiabcd'), '')
1804 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001805
1806class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001807
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001808 def setUp(self):
1809 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1810 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001811 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001812
Guido van Rossumd0712812007-04-11 16:32:43 +00001813 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001814 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001816 def test_constructor(self):
1817 r = self.BytesIO(b"\xc3\xa9\n\n")
1818 b = self.BufferedReader(r, 1000)
1819 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001820 t.__init__(b, encoding="latin-1", newline="\r\n")
1821 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001822 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001823 t.__init__(b, encoding="utf-8", line_buffering=True)
1824 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001825 self.assertEqual(t.line_buffering, True)
1826 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 self.assertRaises(TypeError, t.__init__, b, newline=42)
1828 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1829
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001830 def test_detach(self):
1831 r = self.BytesIO()
1832 b = self.BufferedWriter(r)
1833 t = self.TextIOWrapper(b)
1834 self.assertIs(t.detach(), b)
1835
1836 t = self.TextIOWrapper(b, encoding="ascii")
1837 t.write("howdy")
1838 self.assertFalse(r.getvalue())
1839 t.detach()
1840 self.assertEqual(r.getvalue(), b"howdy")
1841 self.assertRaises(ValueError, t.detach)
1842
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001843 def test_repr(self):
1844 raw = self.BytesIO("hello".encode("utf-8"))
1845 b = self.BufferedReader(raw)
1846 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001847 modname = self.TextIOWrapper.__module__
1848 self.assertEqual(repr(t),
1849 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1850 raw.name = "dummy"
1851 self.assertEqual(repr(t),
1852 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001853 t.mode = "r"
1854 self.assertEqual(repr(t),
1855 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001856 raw.name = b"dummy"
1857 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001858 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 def test_line_buffering(self):
1861 r = self.BytesIO()
1862 b = self.BufferedWriter(r, 1000)
1863 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001864 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001865 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001866 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001868 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001869 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_encoding(self):
1872 # Check the encoding attribute is always set, and valid
1873 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001874 t = self.TextIOWrapper(b, encoding="utf-8")
1875 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001877 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 codecs.lookup(t.encoding)
1879
1880 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001881 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 b = self.BytesIO(b"abc\n\xff\n")
1883 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001884 self.assertRaises(UnicodeError, t.read)
1885 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 b = self.BytesIO(b"abc\n\xff\n")
1887 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001888 self.assertRaises(UnicodeError, t.read)
1889 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 b = self.BytesIO(b"abc\n\xff\n")
1891 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001892 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001893 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894 b = self.BytesIO(b"abc\n\xff\n")
1895 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001896 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001899 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 b = self.BytesIO()
1901 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001902 self.assertRaises(UnicodeError, t.write, "\xff")
1903 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 b = self.BytesIO()
1905 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001906 self.assertRaises(UnicodeError, t.write, "\xff")
1907 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908 b = self.BytesIO()
1909 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001910 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001911 t.write("abc\xffdef\n")
1912 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001913 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001914 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001915 b = self.BytesIO()
1916 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001917 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001918 t.write("abc\xffdef\n")
1919 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001920 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001923 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1924
1925 tests = [
1926 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001927 [ '', input_lines ],
1928 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1929 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1930 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001931 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001932 encodings = (
1933 'utf-8', 'latin-1',
1934 'utf-16', 'utf-16-le', 'utf-16-be',
1935 'utf-32', 'utf-32-le', 'utf-32-be',
1936 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001937
Guido van Rossum8358db22007-08-18 21:39:55 +00001938 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001939 # character in TextIOWrapper._pending_line.
1940 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001941 # XXX: str.encode() should return bytes
1942 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001943 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001944 for bufsize in range(1, 10):
1945 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1947 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001948 encoding=encoding)
1949 if do_reads:
1950 got_lines = []
1951 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001952 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001953 if c2 == '':
1954 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001955 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001956 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001957 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001958 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001959
1960 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001961 self.assertEqual(got_line, exp_line)
1962 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 def test_newlines_input(self):
1965 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001966 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1967 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001968 (None, normalized.decode("ascii").splitlines(keepends=True)),
1969 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1971 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1972 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001973 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 buf = self.BytesIO(testdata)
1975 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001977 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001978 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 def test_newlines_output(self):
1981 testdict = {
1982 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1983 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1984 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1985 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1986 }
1987 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1988 for newline, expected in tests:
1989 buf = self.BytesIO()
1990 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1991 txt.write("AAA\nB")
1992 txt.write("BB\nCCC\n")
1993 txt.write("X\rY\r\nZ")
1994 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(buf.closed, False)
1996 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997
1998 def test_destructor(self):
1999 l = []
2000 base = self.BytesIO
2001 class MyBytesIO(base):
2002 def close(self):
2003 l.append(self.getvalue())
2004 base.close(self)
2005 b = MyBytesIO()
2006 t = self.TextIOWrapper(b, encoding="ascii")
2007 t.write("abc")
2008 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002009 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002010 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011
2012 def test_override_destructor(self):
2013 record = []
2014 class MyTextIO(self.TextIOWrapper):
2015 def __del__(self):
2016 record.append(1)
2017 try:
2018 f = super().__del__
2019 except AttributeError:
2020 pass
2021 else:
2022 f()
2023 def close(self):
2024 record.append(2)
2025 super().close()
2026 def flush(self):
2027 record.append(3)
2028 super().flush()
2029 b = self.BytesIO()
2030 t = MyTextIO(b, encoding="ascii")
2031 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002032 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 self.assertEqual(record, [1, 2, 3])
2034
2035 def test_error_through_destructor(self):
2036 # Test that the exception state is not modified by a destructor,
2037 # even if close() fails.
2038 rawio = self.CloseFailureIO()
2039 def f():
2040 self.TextIOWrapper(rawio).xyzzy
2041 with support.captured_output("stderr") as s:
2042 self.assertRaises(AttributeError, f)
2043 s = s.getvalue().strip()
2044 if s:
2045 # The destructor *may* have printed an unraisable error, check it
2046 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002047 self.assertTrue(s.startswith("Exception IOError: "), s)
2048 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002049
Guido van Rossum9b76da62007-04-11 01:09:03 +00002050 # Systematic tests of the text I/O API
2051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002053 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002054 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002056 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002058 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002060 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002061 self.assertEqual(f.tell(), 0)
2062 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(f.seek(0), 0)
2065 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002066 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002067 self.assertEqual(f.read(2), "ab")
2068 self.assertEqual(f.read(1), "c")
2069 self.assertEqual(f.read(1), "")
2070 self.assertEqual(f.read(), "")
2071 self.assertEqual(f.tell(), cookie)
2072 self.assertEqual(f.seek(0), 0)
2073 self.assertEqual(f.seek(0, 2), cookie)
2074 self.assertEqual(f.write("def"), 3)
2075 self.assertEqual(f.seek(cookie), cookie)
2076 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002077 if enc.startswith("utf"):
2078 self.multi_line_test(f, enc)
2079 f.close()
2080
2081 def multi_line_test(self, f, enc):
2082 f.seek(0)
2083 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002084 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002085 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002086 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002087 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002088 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002089 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002090 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002091 wlines.append((f.tell(), line))
2092 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002093 f.seek(0)
2094 rlines = []
2095 while True:
2096 pos = f.tell()
2097 line = f.readline()
2098 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002099 break
2100 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002101 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002104 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002105 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002106 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002107 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002108 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002109 p2 = f.tell()
2110 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002111 self.assertEqual(f.tell(), p0)
2112 self.assertEqual(f.readline(), "\xff\n")
2113 self.assertEqual(f.tell(), p1)
2114 self.assertEqual(f.readline(), "\xff\n")
2115 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002116 f.seek(0)
2117 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002118 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002119 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002120 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002121 f.close()
2122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 def test_seeking(self):
2124 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002125 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002126 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002127 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002128 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002129 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002130 suffix = bytes(u_suffix.encode("utf-8"))
2131 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002132 with self.open(support.TESTFN, "wb") as f:
2133 f.write(line*2)
2134 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2135 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002136 self.assertEqual(s, str(prefix, "ascii"))
2137 self.assertEqual(f.tell(), prefix_size)
2138 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002141 # Regression test for a specific bug
2142 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002143 with self.open(support.TESTFN, "wb") as f:
2144 f.write(data)
2145 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2146 f._CHUNK_SIZE # Just test that it exists
2147 f._CHUNK_SIZE = 2
2148 f.readline()
2149 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 def test_seek_and_tell(self):
2152 #Test seek/tell using the StatefulIncrementalDecoder.
2153 # Make test faster by doing smaller seeks
2154 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002155
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002156 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002157 """Tell/seek to various points within a data stream and ensure
2158 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002160 f.write(data)
2161 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 f = self.open(support.TESTFN, encoding='test_decoder')
2163 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002164 decoded = f.read()
2165 f.close()
2166
Neal Norwitze2b07052008-03-18 19:52:05 +00002167 for i in range(min_pos, len(decoded) + 1): # seek positions
2168 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002171 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002173 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002174 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002175 f.close()
2176
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002177 # Enable the test decoder.
2178 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002179
2180 # Run the tests.
2181 try:
2182 # Try each test case.
2183 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002184 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002185
2186 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002187 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2188 offset = CHUNK_SIZE - len(input)//2
2189 prefix = b'.'*offset
2190 # Don't bother seeking into the prefix (takes too long).
2191 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002192 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002193
2194 # Ensure our test decoder won't interfere with subsequent tests.
2195 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002196 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002198 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002199 data = "1234567890"
2200 tests = ("utf-16",
2201 "utf-16-le",
2202 "utf-16-be",
2203 "utf-32",
2204 "utf-32-le",
2205 "utf-32-be")
2206 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207 buf = self.BytesIO()
2208 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002209 # Check if the BOM is written only once (see issue1753).
2210 f.write(data)
2211 f.write(data)
2212 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002214 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002215 self.assertEqual(f.read(), data * 2)
2216 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002217
Benjamin Petersona1b49012009-03-31 23:11:32 +00002218 def test_unreadable(self):
2219 class UnReadable(self.BytesIO):
2220 def readable(self):
2221 return False
2222 txt = self.TextIOWrapper(UnReadable())
2223 self.assertRaises(IOError, txt.read)
2224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 def test_read_one_by_one(self):
2226 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002227 reads = ""
2228 while True:
2229 c = txt.read(1)
2230 if not c:
2231 break
2232 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002234
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002235 def test_readlines(self):
2236 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2237 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2238 txt.seek(0)
2239 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2240 txt.seek(0)
2241 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2242
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002243 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002245 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002246 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002247 reads = ""
2248 while True:
2249 c = txt.read(128)
2250 if not c:
2251 break
2252 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002253 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002254
2255 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002256 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002257
2258 # read one char at a time
2259 reads = ""
2260 while True:
2261 c = txt.read(1)
2262 if not c:
2263 break
2264 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002266
2267 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269 txt._CHUNK_SIZE = 4
2270
2271 reads = ""
2272 while True:
2273 c = txt.read(4)
2274 if not c:
2275 break
2276 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002278
2279 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002281 txt._CHUNK_SIZE = 4
2282
2283 reads = txt.read(4)
2284 reads += txt.read(4)
2285 reads += txt.readline()
2286 reads += txt.readline()
2287 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002288 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
2290 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002292 txt._CHUNK_SIZE = 4
2293
2294 reads = txt.read(4)
2295 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002296 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002297
2298 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002300 txt._CHUNK_SIZE = 4
2301
2302 reads = txt.read(4)
2303 pos = txt.tell()
2304 txt.seek(0)
2305 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002306 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002307
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002308 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002309 buffer = self.BytesIO(self.testdata)
2310 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002311
2312 self.assertEqual(buffer.seekable(), txt.seekable())
2313
Antoine Pitroue4501852009-05-14 18:55:55 +00002314 def test_append_bom(self):
2315 # The BOM is not written again when appending to a non-empty file
2316 filename = support.TESTFN
2317 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2318 with self.open(filename, 'w', encoding=charset) as f:
2319 f.write('aaa')
2320 pos = f.tell()
2321 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002322 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002323
2324 with self.open(filename, 'a', encoding=charset) as f:
2325 f.write('xxx')
2326 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002327 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002328
2329 def test_seek_bom(self):
2330 # Same test, but when seeking manually
2331 filename = support.TESTFN
2332 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2333 with self.open(filename, 'w', encoding=charset) as f:
2334 f.write('aaa')
2335 pos = f.tell()
2336 with self.open(filename, 'r+', encoding=charset) as f:
2337 f.seek(pos)
2338 f.write('zzz')
2339 f.seek(0)
2340 f.write('bbb')
2341 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002342 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002343
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002344 def test_errors_property(self):
2345 with self.open(support.TESTFN, "w") as f:
2346 self.assertEqual(f.errors, "strict")
2347 with self.open(support.TESTFN, "w", errors="replace") as f:
2348 self.assertEqual(f.errors, "replace")
2349
Brett Cannon31f59292011-02-21 19:29:56 +00002350 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002351 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002352 def test_threads_write(self):
2353 # Issue6750: concurrent writes could duplicate data
2354 event = threading.Event()
2355 with self.open(support.TESTFN, "w", buffering=1) as f:
2356 def run(n):
2357 text = "Thread%03d\n" % n
2358 event.wait()
2359 f.write(text)
2360 threads = [threading.Thread(target=lambda n=x: run(n))
2361 for x in range(20)]
2362 for t in threads:
2363 t.start()
2364 time.sleep(0.02)
2365 event.set()
2366 for t in threads:
2367 t.join()
2368 with self.open(support.TESTFN) as f:
2369 content = f.read()
2370 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002371 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002372
Antoine Pitrou6be88762010-05-03 16:48:20 +00002373 def test_flush_error_on_close(self):
2374 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2375 def bad_flush():
2376 raise IOError()
2377 txt.flush = bad_flush
2378 self.assertRaises(IOError, txt.close) # exception not swallowed
2379
2380 def test_multi_close(self):
2381 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2382 txt.close()
2383 txt.close()
2384 txt.close()
2385 self.assertRaises(ValueError, txt.flush)
2386
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002387 def test_unseekable(self):
2388 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2389 self.assertRaises(self.UnsupportedOperation, txt.tell)
2390 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2391
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002392 def test_readonly_attributes(self):
2393 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2394 buf = self.BytesIO(self.testdata)
2395 with self.assertRaises(AttributeError):
2396 txt.buffer = buf
2397
Antoine Pitroue96ec682011-07-23 21:46:35 +02002398 def test_rawio(self):
2399 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2400 # that subprocess.Popen() can have the required unbuffered
2401 # semantics with universal_newlines=True.
2402 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2403 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2404 # Reads
2405 self.assertEqual(txt.read(4), 'abcd')
2406 self.assertEqual(txt.readline(), 'efghi\n')
2407 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2408
2409 def test_rawio_write_through(self):
2410 # Issue #12591: with write_through=True, writes don't need a flush
2411 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2412 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2413 write_through=True)
2414 txt.write('1')
2415 txt.write('23\n4')
2416 txt.write('5')
2417 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419class CTextIOWrapperTest(TextIOWrapperTest):
2420
2421 def test_initialization(self):
2422 r = self.BytesIO(b"\xc3\xa9\n\n")
2423 b = self.BufferedReader(r, 1000)
2424 t = self.TextIOWrapper(b)
2425 self.assertRaises(TypeError, t.__init__, b, newline=42)
2426 self.assertRaises(ValueError, t.read)
2427 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2428 self.assertRaises(ValueError, t.read)
2429
2430 def test_garbage_collection(self):
2431 # C TextIOWrapper objects are collected, and collecting them flushes
2432 # all data to disk.
2433 # The Python version has __del__, so it ends in gc.garbage instead.
2434 rawio = io.FileIO(support.TESTFN, "wb")
2435 b = self.BufferedWriter(rawio)
2436 t = self.TextIOWrapper(b, encoding="ascii")
2437 t.write("456def")
2438 t.x = t
2439 wr = weakref.ref(t)
2440 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002441 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002442 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002443 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 self.assertEqual(f.read(), b"456def")
2445
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002446 def test_rwpair_cleared_before_textio(self):
2447 # Issue 13070: TextIOWrapper's finalization would crash when called
2448 # after the reference to the underlying BufferedRWPair's writer got
2449 # cleared by the GC.
2450 for i in range(1000):
2451 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2452 t1 = self.TextIOWrapper(b1, encoding="ascii")
2453 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2454 t2 = self.TextIOWrapper(b2, encoding="ascii")
2455 # circular references
2456 t1.buddy = t2
2457 t2.buddy = t1
2458 support.gc_collect()
2459
2460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002461class PyTextIOWrapperTest(TextIOWrapperTest):
2462 pass
2463
2464
2465class IncrementalNewlineDecoderTest(unittest.TestCase):
2466
2467 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002468 # UTF-8 specific tests for a newline decoder
2469 def _check_decode(b, s, **kwargs):
2470 # We exercise getstate() / setstate() as well as decode()
2471 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002473 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002474 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002475
Antoine Pitrou180a3362008-12-14 16:36:46 +00002476 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002477
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478 _check_decode(b'\xe8', "")
2479 _check_decode(b'\xa2', "")
2480 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002481
Antoine Pitrou180a3362008-12-14 16:36:46 +00002482 _check_decode(b'\xe8', "")
2483 _check_decode(b'\xa2', "")
2484 _check_decode(b'\x88', "\u8888")
2485
2486 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002487 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2488
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 decoder.reset()
2490 _check_decode(b'\n', "\n")
2491 _check_decode(b'\r', "")
2492 _check_decode(b'', "\n", final=True)
2493 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002494
Antoine Pitrou180a3362008-12-14 16:36:46 +00002495 _check_decode(b'\r', "")
2496 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002497
Antoine Pitrou180a3362008-12-14 16:36:46 +00002498 _check_decode(b'\r\r\n', "\n\n")
2499 _check_decode(b'\r', "")
2500 _check_decode(b'\r', "\n")
2501 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002502
Antoine Pitrou180a3362008-12-14 16:36:46 +00002503 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2504 _check_decode(b'\xe8\xa2\x88', "\u8888")
2505 _check_decode(b'\n', "\n")
2506 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2507 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002510 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002511 if encoding is not None:
2512 encoder = codecs.getincrementalencoder(encoding)()
2513 def _decode_bytewise(s):
2514 # Decode one byte at a time
2515 for b in encoder.encode(s):
2516 result.append(decoder.decode(bytes([b])))
2517 else:
2518 encoder = None
2519 def _decode_bytewise(s):
2520 # Decode one char at a time
2521 for c in s:
2522 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002523 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002524 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002525 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002526 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002527 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002528 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002529 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002530 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002531 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002532 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002533 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002534 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002535 input = "abc"
2536 if encoder is not None:
2537 encoder.reset()
2538 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(decoder.decode(input), "abc")
2540 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002541
2542 def test_newline_decoder(self):
2543 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002544 # None meaning the IncrementalNewlineDecoder takes unicode input
2545 # rather than bytes input
2546 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002547 'utf-16', 'utf-16-le', 'utf-16-be',
2548 'utf-32', 'utf-32-le', 'utf-32-be',
2549 )
2550 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002551 decoder = enc and codecs.getincrementaldecoder(enc)()
2552 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2553 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002554 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002555 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2556 self.check_newline_decoding_utf8(decoder)
2557
Antoine Pitrou66913e22009-03-06 23:40:56 +00002558 def test_newline_bytes(self):
2559 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2560 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(dec.newlines, None)
2562 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2563 self.assertEqual(dec.newlines, None)
2564 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2565 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002566 dec = self.IncrementalNewlineDecoder(None, translate=False)
2567 _check(dec)
2568 dec = self.IncrementalNewlineDecoder(None, translate=True)
2569 _check(dec)
2570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2572 pass
2573
2574class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2575 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002576
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002577
Guido van Rossum01a27522007-03-07 01:00:12 +00002578# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002579
Guido van Rossum5abbf752007-08-27 17:39:33 +00002580class MiscIOTest(unittest.TestCase):
2581
Barry Warsaw40e82462008-11-20 20:14:50 +00002582 def tearDown(self):
2583 support.unlink(support.TESTFN)
2584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 def test___all__(self):
2586 for name in self.io.__all__:
2587 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002588 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002589 if name == "open":
2590 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002591 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002592 self.assertTrue(issubclass(obj, Exception), name)
2593 elif not name.startswith("SEEK_"):
2594 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002595
Barry Warsaw40e82462008-11-20 20:14:50 +00002596 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002597 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002599 f.close()
2600
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002602 self.assertEqual(f.name, support.TESTFN)
2603 self.assertEqual(f.buffer.name, support.TESTFN)
2604 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2605 self.assertEqual(f.mode, "U")
2606 self.assertEqual(f.buffer.mode, "rb")
2607 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002608 f.close()
2609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002610 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002611 self.assertEqual(f.mode, "w+")
2612 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2613 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002615 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(g.mode, "wb")
2617 self.assertEqual(g.raw.mode, "wb")
2618 self.assertEqual(g.name, f.fileno())
2619 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002620 f.close()
2621 g.close()
2622
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002623 def test_io_after_close(self):
2624 for kwargs in [
2625 {"mode": "w"},
2626 {"mode": "wb"},
2627 {"mode": "w", "buffering": 1},
2628 {"mode": "w", "buffering": 2},
2629 {"mode": "wb", "buffering": 0},
2630 {"mode": "r"},
2631 {"mode": "rb"},
2632 {"mode": "r", "buffering": 1},
2633 {"mode": "r", "buffering": 2},
2634 {"mode": "rb", "buffering": 0},
2635 {"mode": "w+"},
2636 {"mode": "w+b"},
2637 {"mode": "w+", "buffering": 1},
2638 {"mode": "w+", "buffering": 2},
2639 {"mode": "w+b", "buffering": 0},
2640 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002642 f.close()
2643 self.assertRaises(ValueError, f.flush)
2644 self.assertRaises(ValueError, f.fileno)
2645 self.assertRaises(ValueError, f.isatty)
2646 self.assertRaises(ValueError, f.__iter__)
2647 if hasattr(f, "peek"):
2648 self.assertRaises(ValueError, f.peek, 1)
2649 self.assertRaises(ValueError, f.read)
2650 if hasattr(f, "read1"):
2651 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002652 if hasattr(f, "readall"):
2653 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002654 if hasattr(f, "readinto"):
2655 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2656 self.assertRaises(ValueError, f.readline)
2657 self.assertRaises(ValueError, f.readlines)
2658 self.assertRaises(ValueError, f.seek, 0)
2659 self.assertRaises(ValueError, f.tell)
2660 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 self.assertRaises(ValueError, f.write,
2662 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002663 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 def test_blockingioerror(self):
2667 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002668 class C(str):
2669 pass
2670 c = C("")
2671 b = self.BlockingIOError(1, c)
2672 c.b = b
2673 b.c = c
2674 wr = weakref.ref(c)
2675 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002676 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002677 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002678
2679 def test_abcs(self):
2680 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002681 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2682 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2683 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2684 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685
2686 def _check_abc_inheritance(self, abcmodule):
2687 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002688 self.assertIsInstance(f, abcmodule.IOBase)
2689 self.assertIsInstance(f, abcmodule.RawIOBase)
2690 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2691 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002693 self.assertIsInstance(f, abcmodule.IOBase)
2694 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2695 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2696 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002698 self.assertIsInstance(f, abcmodule.IOBase)
2699 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2700 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2701 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002702
2703 def test_abc_inheritance(self):
2704 # Test implementations inherit from their respective ABCs
2705 self._check_abc_inheritance(self)
2706
2707 def test_abc_inheritance_official(self):
2708 # Test implementations inherit from the official ABCs of the
2709 # baseline "io" module.
2710 self._check_abc_inheritance(io)
2711
Antoine Pitroue033e062010-10-29 10:38:18 +00002712 def _check_warn_on_dealloc(self, *args, **kwargs):
2713 f = open(*args, **kwargs)
2714 r = repr(f)
2715 with self.assertWarns(ResourceWarning) as cm:
2716 f = None
2717 support.gc_collect()
2718 self.assertIn(r, str(cm.warning.args[0]))
2719
2720 def test_warn_on_dealloc(self):
2721 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2722 self._check_warn_on_dealloc(support.TESTFN, "wb")
2723 self._check_warn_on_dealloc(support.TESTFN, "w")
2724
2725 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2726 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002727 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002728 for fd in fds:
2729 try:
2730 os.close(fd)
2731 except EnvironmentError as e:
2732 if e.errno != errno.EBADF:
2733 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002734 self.addCleanup(cleanup_fds)
2735 r, w = os.pipe()
2736 fds += r, w
2737 self._check_warn_on_dealloc(r, *args, **kwargs)
2738 # When using closefd=False, there's no warning
2739 r, w = os.pipe()
2740 fds += r, w
2741 with warnings.catch_warnings(record=True) as recorded:
2742 open(r, *args, closefd=False, **kwargs)
2743 support.gc_collect()
2744 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002745
2746 def test_warn_on_dealloc_fd(self):
2747 self._check_warn_on_dealloc_fd("rb", buffering=0)
2748 self._check_warn_on_dealloc_fd("rb")
2749 self._check_warn_on_dealloc_fd("r")
2750
2751
Antoine Pitrou243757e2010-11-05 21:15:39 +00002752 def test_pickling(self):
2753 # Pickling file objects is forbidden
2754 for kwargs in [
2755 {"mode": "w"},
2756 {"mode": "wb"},
2757 {"mode": "wb", "buffering": 0},
2758 {"mode": "r"},
2759 {"mode": "rb"},
2760 {"mode": "rb", "buffering": 0},
2761 {"mode": "w+"},
2762 {"mode": "w+b"},
2763 {"mode": "w+b", "buffering": 0},
2764 ]:
2765 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2766 with self.open(support.TESTFN, **kwargs) as f:
2767 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2768
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002769 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2770 def test_nonblock_pipe_write_bigbuf(self):
2771 self._test_nonblock_pipe_write(16*1024)
2772
2773 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2774 def test_nonblock_pipe_write_smallbuf(self):
2775 self._test_nonblock_pipe_write(1024)
2776
2777 def _set_non_blocking(self, fd):
2778 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2779 self.assertNotEqual(flags, -1)
2780 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2781 self.assertEqual(res, 0)
2782
2783 def _test_nonblock_pipe_write(self, bufsize):
2784 sent = []
2785 received = []
2786 r, w = os.pipe()
2787 self._set_non_blocking(r)
2788 self._set_non_blocking(w)
2789
2790 # To exercise all code paths in the C implementation we need
2791 # to play with buffer sizes. For instance, if we choose a
2792 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2793 # then we will never get a partial write of the buffer.
2794 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2795 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2796
2797 with rf, wf:
2798 for N in 9999, 73, 7574:
2799 try:
2800 i = 0
2801 while True:
2802 msg = bytes([i % 26 + 97]) * N
2803 sent.append(msg)
2804 wf.write(msg)
2805 i += 1
2806
2807 except self.BlockingIOError as e:
2808 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002809 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002810 sent[-1] = sent[-1][:e.characters_written]
2811 received.append(rf.read())
2812 msg = b'BLOCKED'
2813 wf.write(msg)
2814 sent.append(msg)
2815
2816 while True:
2817 try:
2818 wf.flush()
2819 break
2820 except self.BlockingIOError as e:
2821 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002822 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002823 self.assertEqual(e.characters_written, 0)
2824 received.append(rf.read())
2825
2826 received += iter(rf.read, None)
2827
2828 sent, received = b''.join(sent), b''.join(received)
2829 self.assertTrue(sent == received)
2830 self.assertTrue(wf.closed)
2831 self.assertTrue(rf.closed)
2832
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002833 def test_create_fail(self):
2834 # 'x' mode fails if file is existing
2835 with self.open(support.TESTFN, 'w'):
2836 pass
2837 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2838
2839 def test_create_writes(self):
2840 # 'x' mode opens for writing
2841 with self.open(support.TESTFN, 'xb') as f:
2842 f.write(b"spam")
2843 with self.open(support.TESTFN, 'rb') as f:
2844 self.assertEqual(b"spam", f.read())
2845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002846class CMiscIOTest(MiscIOTest):
2847 io = io
2848
2849class PyMiscIOTest(MiscIOTest):
2850 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002851
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002852
2853@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2854class SignalsTest(unittest.TestCase):
2855
2856 def setUp(self):
2857 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2858
2859 def tearDown(self):
2860 signal.signal(signal.SIGALRM, self.oldalrm)
2861
2862 def alarm_interrupt(self, sig, frame):
2863 1/0
2864
2865 @unittest.skipUnless(threading, 'Threading required for this test.')
2866 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2867 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002868 invokes the signal handler, and bubbles up the exception raised
2869 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002870 read_results = []
2871 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002872 if hasattr(signal, 'pthread_sigmask'):
2873 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002874 s = os.read(r, 1)
2875 read_results.append(s)
2876 t = threading.Thread(target=_read)
2877 t.daemon = True
2878 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002879 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002880 try:
2881 wio = self.io.open(w, **fdopen_kwargs)
2882 t.start()
2883 signal.alarm(1)
2884 # Fill the pipe enough that the write will be blocking.
2885 # It will be interrupted by the timer armed above. Since the
2886 # other thread has read one byte, the low-level write will
2887 # return with a successful (partial) result rather than an EINTR.
2888 # The buffered IO layer must check for pending signal
2889 # handlers, which in this case will invoke alarm_interrupt().
2890 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002891 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002892 t.join()
2893 # We got one byte, get another one and check that it isn't a
2894 # repeat of the first one.
2895 read_results.append(os.read(r, 1))
2896 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2897 finally:
2898 os.close(w)
2899 os.close(r)
2900 # This is deliberate. If we didn't close the file descriptor
2901 # before closing wio, wio would try to flush its internal
2902 # buffer, and block again.
2903 try:
2904 wio.close()
2905 except IOError as e:
2906 if e.errno != errno.EBADF:
2907 raise
2908
2909 def test_interrupted_write_unbuffered(self):
2910 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2911
2912 def test_interrupted_write_buffered(self):
2913 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2914
2915 def test_interrupted_write_text(self):
2916 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2917
Brett Cannon31f59292011-02-21 19:29:56 +00002918 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002919 def check_reentrant_write(self, data, **fdopen_kwargs):
2920 def on_alarm(*args):
2921 # Will be called reentrantly from the same thread
2922 wio.write(data)
2923 1/0
2924 signal.signal(signal.SIGALRM, on_alarm)
2925 r, w = os.pipe()
2926 wio = self.io.open(w, **fdopen_kwargs)
2927 try:
2928 signal.alarm(1)
2929 # Either the reentrant call to wio.write() fails with RuntimeError,
2930 # or the signal handler raises ZeroDivisionError.
2931 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2932 while 1:
2933 for i in range(100):
2934 wio.write(data)
2935 wio.flush()
2936 # Make sure the buffer doesn't fill up and block further writes
2937 os.read(r, len(data) * 100)
2938 exc = cm.exception
2939 if isinstance(exc, RuntimeError):
2940 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2941 finally:
2942 wio.close()
2943 os.close(r)
2944
2945 def test_reentrant_write_buffered(self):
2946 self.check_reentrant_write(b"xy", mode="wb")
2947
2948 def test_reentrant_write_text(self):
2949 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2950
Antoine Pitrou707ce822011-02-25 21:24:11 +00002951 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2952 """Check that a buffered read, when it gets interrupted (either
2953 returning a partial result or EINTR), properly invokes the signal
2954 handler and retries if the latter returned successfully."""
2955 r, w = os.pipe()
2956 fdopen_kwargs["closefd"] = False
2957 def alarm_handler(sig, frame):
2958 os.write(w, b"bar")
2959 signal.signal(signal.SIGALRM, alarm_handler)
2960 try:
2961 rio = self.io.open(r, **fdopen_kwargs)
2962 os.write(w, b"foo")
2963 signal.alarm(1)
2964 # Expected behaviour:
2965 # - first raw read() returns partial b"foo"
2966 # - second raw read() returns EINTR
2967 # - third raw read() returns b"bar"
2968 self.assertEqual(decode(rio.read(6)), "foobar")
2969 finally:
2970 rio.close()
2971 os.close(w)
2972 os.close(r)
2973
Antoine Pitrou20db5112011-08-19 20:32:34 +02002974 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002975 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2976 mode="rb")
2977
Antoine Pitrou20db5112011-08-19 20:32:34 +02002978 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002979 self.check_interrupted_read_retry(lambda x: x,
2980 mode="r")
2981
2982 @unittest.skipUnless(threading, 'Threading required for this test.')
2983 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2984 """Check that a buffered write, when it gets interrupted (either
2985 returning a partial result or EINTR), properly invokes the signal
2986 handler and retries if the latter returned successfully."""
2987 select = support.import_module("select")
2988 # A quantity that exceeds the buffer size of an anonymous pipe's
2989 # write end.
2990 N = 1024 * 1024
2991 r, w = os.pipe()
2992 fdopen_kwargs["closefd"] = False
2993 # We need a separate thread to read from the pipe and allow the
2994 # write() to finish. This thread is started after the SIGALRM is
2995 # received (forcing a first EINTR in write()).
2996 read_results = []
2997 write_finished = False
2998 def _read():
2999 while not write_finished:
3000 while r in select.select([r], [], [], 1.0)[0]:
3001 s = os.read(r, 1024)
3002 read_results.append(s)
3003 t = threading.Thread(target=_read)
3004 t.daemon = True
3005 def alarm1(sig, frame):
3006 signal.signal(signal.SIGALRM, alarm2)
3007 signal.alarm(1)
3008 def alarm2(sig, frame):
3009 t.start()
3010 signal.signal(signal.SIGALRM, alarm1)
3011 try:
3012 wio = self.io.open(w, **fdopen_kwargs)
3013 signal.alarm(1)
3014 # Expected behaviour:
3015 # - first raw write() is partial (because of the limited pipe buffer
3016 # and the first alarm)
3017 # - second raw write() returns EINTR (because of the second alarm)
3018 # - subsequent write()s are successful (either partial or complete)
3019 self.assertEqual(N, wio.write(item * N))
3020 wio.flush()
3021 write_finished = True
3022 t.join()
3023 self.assertEqual(N, sum(len(x) for x in read_results))
3024 finally:
3025 write_finished = True
3026 os.close(w)
3027 os.close(r)
3028 # This is deliberate. If we didn't close the file descriptor
3029 # before closing wio, wio would try to flush its internal
3030 # buffer, and could block (in case of failure).
3031 try:
3032 wio.close()
3033 except IOError as e:
3034 if e.errno != errno.EBADF:
3035 raise
3036
Antoine Pitrou20db5112011-08-19 20:32:34 +02003037 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003038 self.check_interrupted_write_retry(b"x", mode="wb")
3039
Antoine Pitrou20db5112011-08-19 20:32:34 +02003040 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003041 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3042
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003043
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003044class CSignalsTest(SignalsTest):
3045 io = io
3046
3047class PySignalsTest(SignalsTest):
3048 io = pyio
3049
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003050 # Handling reentrancy issues would slow down _pyio even more, so the
3051 # tests are disabled.
3052 test_reentrant_write_buffered = None
3053 test_reentrant_write_text = None
3054
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003055
Guido van Rossum28524c72007-02-27 05:47:44 +00003056def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003057 tests = (CIOTest, PyIOTest,
3058 CBufferedReaderTest, PyBufferedReaderTest,
3059 CBufferedWriterTest, PyBufferedWriterTest,
3060 CBufferedRWPairTest, PyBufferedRWPairTest,
3061 CBufferedRandomTest, PyBufferedRandomTest,
3062 StatefulIncrementalDecoderTest,
3063 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3064 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003065 CMiscIOTest, PyMiscIOTest,
3066 CSignalsTest, PySignalsTest,
3067 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068
3069 # Put the namespaces of the IO module we are testing and some useful mock
3070 # classes in the __dict__ of each test.
3071 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003072 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003073 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3074 c_io_ns = {name : getattr(io, name) for name in all_members}
3075 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3076 globs = globals()
3077 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3078 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3079 # Avoid turning open into a bound method.
3080 py_io_ns["open"] = pyio.OpenWrapper
3081 for test in tests:
3082 if test.__name__.startswith("C"):
3083 for name, obj in c_io_ns.items():
3084 setattr(test, name, obj)
3085 elif test.__name__.startswith("Py"):
3086 for name, obj in py_io_ns.items():
3087 setattr(test, name, obj)
3088
3089 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003090
3091if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003092 test_main()