blob: ea82cea17ae688570c299ad9b62539c419430d59 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200638
639 def test_IOBase_finalize(self):
640 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
641 # class which inherits IOBase and an object of this class are caught
642 # in a reference cycle and close() is already in the method cache.
643 class MyIO(self.IOBase):
644 def close(self):
645 pass
646
647 # create an instance to populate the method cache
648 MyIO()
649 obj = MyIO()
650 obj.obj = obj
651 wr = weakref.ref(obj)
652 del MyIO
653 del obj
654 support.gc_collect()
655 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class PyIOTest(IOTest):
658 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000659
Guido van Rossuma9e20242007-03-08 00:43:48 +0000660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661class CommonBufferedTests:
662 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
663
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000664 def test_detach(self):
665 raw = self.MockRawIO()
666 buf = self.tp(raw)
667 self.assertIs(buf.detach(), raw)
668 self.assertRaises(ValueError, buf.detach)
669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 def test_fileno(self):
671 rawio = self.MockRawIO()
672 bufio = self.tp(rawio)
673
Ezio Melottib3aedd42010-11-20 19:04:17 +0000674 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675
676 def test_no_fileno(self):
677 # XXX will we always have fileno() function? If so, kill
678 # this test. Else, write it.
679 pass
680
681 def test_invalid_args(self):
682 rawio = self.MockRawIO()
683 bufio = self.tp(rawio)
684 # Invalid whence
685 self.assertRaises(ValueError, bufio.seek, 0, -1)
686 self.assertRaises(ValueError, bufio.seek, 0, 3)
687
688 def test_override_destructor(self):
689 tp = self.tp
690 record = []
691 class MyBufferedIO(tp):
692 def __del__(self):
693 record.append(1)
694 try:
695 f = super().__del__
696 except AttributeError:
697 pass
698 else:
699 f()
700 def close(self):
701 record.append(2)
702 super().close()
703 def flush(self):
704 record.append(3)
705 super().flush()
706 rawio = self.MockRawIO()
707 bufio = MyBufferedIO(rawio)
708 writable = bufio.writable()
709 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000710 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 if writable:
712 self.assertEqual(record, [1, 2, 3])
713 else:
714 self.assertEqual(record, [1, 2])
715
716 def test_context_manager(self):
717 # Test usability as a context manager
718 rawio = self.MockRawIO()
719 bufio = self.tp(rawio)
720 def _with():
721 with bufio:
722 pass
723 _with()
724 # bufio should now be closed, and using it a second time should raise
725 # a ValueError.
726 self.assertRaises(ValueError, _with)
727
728 def test_error_through_destructor(self):
729 # Test that the exception state is not modified by a destructor,
730 # even if close() fails.
731 rawio = self.CloseFailureIO()
732 def f():
733 self.tp(rawio).xyzzy
734 with support.captured_output("stderr") as s:
735 self.assertRaises(AttributeError, f)
736 s = s.getvalue().strip()
737 if s:
738 # The destructor *may* have printed an unraisable error, check it
739 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000740 self.assertTrue(s.startswith("Exception IOError: "), s)
741 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000742
Antoine Pitrou716c4442009-05-23 19:04:03 +0000743 def test_repr(self):
744 raw = self.MockRawIO()
745 b = self.tp(raw)
746 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
747 self.assertEqual(repr(b), "<%s>" % clsname)
748 raw.name = "dummy"
749 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
750 raw.name = b"dummy"
751 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
752
Antoine Pitrou6be88762010-05-03 16:48:20 +0000753 def test_flush_error_on_close(self):
754 raw = self.MockRawIO()
755 def bad_flush():
756 raise IOError()
757 raw.flush = bad_flush
758 b = self.tp(raw)
759 self.assertRaises(IOError, b.close) # exception not swallowed
760
761 def test_multi_close(self):
762 raw = self.MockRawIO()
763 b = self.tp(raw)
764 b.close()
765 b.close()
766 b.close()
767 self.assertRaises(ValueError, b.flush)
768
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000769 def test_unseekable(self):
770 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
771 self.assertRaises(self.UnsupportedOperation, bufio.tell)
772 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
773
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000774 def test_readonly_attributes(self):
775 raw = self.MockRawIO()
776 buf = self.tp(raw)
777 x = self.MockRawIO()
778 with self.assertRaises(AttributeError):
779 buf.raw = x
780
Guido van Rossum78892e42007-04-06 17:31:18 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
783 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000784
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000785 def test_constructor(self):
786 rawio = self.MockRawIO([b"abc"])
787 bufio = self.tp(rawio)
788 bufio.__init__(rawio)
789 bufio.__init__(rawio, buffer_size=1024)
790 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000791 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000792 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
795 rawio = self.MockRawIO([b"abc"])
796 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000797 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000800 for arg in (None, 7):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000803 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000804 # Invalid args
805 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 def test_read1(self):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000810 self.assertEqual(b"a", bufio.read(1))
811 self.assertEqual(b"b", bufio.read1(1))
812 self.assertEqual(rawio._reads, 1)
813 self.assertEqual(b"c", bufio.read1(100))
814 self.assertEqual(rawio._reads, 1)
815 self.assertEqual(b"d", bufio.read1(100))
816 self.assertEqual(rawio._reads, 2)
817 self.assertEqual(b"efg", bufio.read1(100))
818 self.assertEqual(rawio._reads, 3)
819 self.assertEqual(b"", bufio.read1(100))
820 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000821 # Invalid args
822 self.assertRaises(ValueError, bufio.read1, -1)
823
824 def test_readinto(self):
825 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000828 self.assertEqual(bufio.readinto(b), 2)
829 self.assertEqual(b, b"ab")
830 self.assertEqual(bufio.readinto(b), 2)
831 self.assertEqual(b, b"cd")
832 self.assertEqual(bufio.readinto(b), 2)
833 self.assertEqual(b, b"ef")
834 self.assertEqual(bufio.readinto(b), 1)
835 self.assertEqual(b, b"gf")
836 self.assertEqual(bufio.readinto(b), 0)
837 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000838
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000839 def test_readlines(self):
840 def bufio():
841 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
842 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000843 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
844 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
845 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000848 data = b"abcdefghi"
849 dlen = len(data)
850
851 tests = [
852 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
853 [ 100, [ 3, 3, 3], [ dlen ] ],
854 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
855 ]
856
857 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 rawio = self.MockFileIO(data)
859 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000860 pos = 0
861 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000862 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000863 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000868 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
870 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000871 self.assertEqual(b"abcd", bufio.read(6))
872 self.assertEqual(b"e", bufio.read(1))
873 self.assertEqual(b"fg", bufio.read())
874 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200875 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000876 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000877
Victor Stinnera80987f2011-05-25 22:47:16 +0200878 rawio = self.MockRawIO((b"a", None, None))
879 self.assertEqual(b"a", rawio.readall())
880 self.assertIsNone(rawio.readall())
881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 def test_read_past_eof(self):
883 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
884 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000885
Ezio Melottib3aedd42010-11-20 19:04:17 +0000886 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000888 def test_read_all(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000891
Ezio Melottib3aedd42010-11-20 19:04:17 +0000892 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000893
Victor Stinner45df8202010-04-28 22:31:17 +0000894 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000895 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000897 try:
898 # Write out many bytes with exactly the same number of 0's,
899 # 1's... 255's. This will help us check that concurrent reading
900 # doesn't duplicate or forget contents.
901 N = 1000
902 l = list(range(256)) * N
903 random.shuffle(l)
904 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000905 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000906 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000907 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000909 errors = []
910 results = []
911 def f():
912 try:
913 # Intra-buffer read then buffer-flushing read
914 for n in cycle([1, 19]):
915 s = bufio.read(n)
916 if not s:
917 break
918 # list.append() is atomic
919 results.append(s)
920 except Exception as e:
921 errors.append(e)
922 raise
923 threads = [threading.Thread(target=f) for x in range(20)]
924 for t in threads:
925 t.start()
926 time.sleep(0.02) # yield
927 for t in threads:
928 t.join()
929 self.assertFalse(errors,
930 "the following exceptions were caught: %r" % errors)
931 s = b''.join(results)
932 for i in range(256):
933 c = bytes(bytearray([i]))
934 self.assertEqual(s.count(c), N)
935 finally:
936 support.unlink(support.TESTFN)
937
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200938 def test_unseekable(self):
939 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
940 self.assertRaises(self.UnsupportedOperation, bufio.tell)
941 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
942 bufio.read(1)
943 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
944 self.assertRaises(self.UnsupportedOperation, bufio.tell)
945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946 def test_misbehaved_io(self):
947 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
949 self.assertRaises(IOError, bufio.seek, 0)
950 self.assertRaises(IOError, bufio.tell)
951
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000952 def test_no_extraneous_read(self):
953 # Issue #9550; when the raw IO object has satisfied the read request,
954 # we should not issue any additional reads, otherwise it may block
955 # (e.g. socket).
956 bufsize = 16
957 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
958 rawio = self.MockRawIO([b"x" * n])
959 bufio = self.tp(rawio, bufsize)
960 self.assertEqual(bufio.read(n), b"x" * n)
961 # Simple case: one raw read is enough to satisfy the request.
962 self.assertEqual(rawio._extraneous_reads, 0,
963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964 # A more complex case where two raw reads are needed to satisfy
965 # the request.
966 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
967 bufio = self.tp(rawio, bufsize)
968 self.assertEqual(bufio.read(n), b"x" * n)
969 self.assertEqual(rawio._extraneous_reads, 0,
970 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
971
972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973class CBufferedReaderTest(BufferedReaderTest):
974 tp = io.BufferedReader
975
976 def test_constructor(self):
977 BufferedReaderTest.test_constructor(self)
978 # The allocation can succeed on 32-bit builds, e.g. with more
979 # than 2GB RAM and a 64-bit kernel.
980 if sys.maxsize > 0x7FFFFFFF:
981 rawio = self.MockRawIO()
982 bufio = self.tp(rawio)
983 self.assertRaises((OverflowError, MemoryError, ValueError),
984 bufio.__init__, rawio, sys.maxsize)
985
986 def test_initialization(self):
987 rawio = self.MockRawIO([b"abc"])
988 bufio = self.tp(rawio)
989 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
990 self.assertRaises(ValueError, bufio.read)
991 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
992 self.assertRaises(ValueError, bufio.read)
993 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
994 self.assertRaises(ValueError, bufio.read)
995
996 def test_misbehaved_io_read(self):
997 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
998 bufio = self.tp(rawio)
999 # _pyio.BufferedReader seems to implement reading different, so that
1000 # checking this is not so easy.
1001 self.assertRaises(IOError, bufio.read, 10)
1002
1003 def test_garbage_collection(self):
1004 # C BufferedReader objects are collected.
1005 # The Python version has __del__, so it ends into gc.garbage instead
1006 rawio = self.FileIO(support.TESTFN, "w+b")
1007 f = self.tp(rawio)
1008 f.f = f
1009 wr = weakref.ref(f)
1010 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001011 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001012 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013
1014class PyBufferedReaderTest(BufferedReaderTest):
1015 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001016
Guido van Rossuma9e20242007-03-08 00:43:48 +00001017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1019 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 def test_constructor(self):
1022 rawio = self.MockRawIO()
1023 bufio = self.tp(rawio)
1024 bufio.__init__(rawio)
1025 bufio.__init__(rawio, buffer_size=1024)
1026 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001027 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 bufio.flush()
1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1031 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1032 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001033 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001037 def test_detach_flush(self):
1038 raw = self.MockRawIO()
1039 buf = self.tp(raw)
1040 buf.write(b"howdy!")
1041 self.assertFalse(raw._write_stack)
1042 buf.detach()
1043 self.assertEqual(raw._write_stack, [b"howdy!"])
1044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001046 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 writer = self.MockRawIO()
1048 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001049 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001050 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_write_overflow(self):
1053 writer = self.MockRawIO()
1054 bufio = self.tp(writer, 8)
1055 contents = b"abcdefghijklmnop"
1056 for n in range(0, len(contents), 3):
1057 bufio.write(contents[n:n+3])
1058 flushed = b"".join(writer._write_stack)
1059 # At least (total - 8) bytes were implicitly flushed, perhaps more
1060 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001061 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def check_writes(self, intermediate_func):
1064 # Lots of writes, test the flushed output is as expected.
1065 contents = bytes(range(256)) * 1000
1066 n = 0
1067 writer = self.MockRawIO()
1068 bufio = self.tp(writer, 13)
1069 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1070 def gen_sizes():
1071 for size in count(1):
1072 for i in range(15):
1073 yield size
1074 sizes = gen_sizes()
1075 while n < len(contents):
1076 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001077 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078 intermediate_func(bufio)
1079 n += size
1080 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 def test_writes(self):
1084 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086 def test_writes_and_flushes(self):
1087 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 def test_writes_and_seeks(self):
1090 def _seekabs(bufio):
1091 pos = bufio.tell()
1092 bufio.seek(pos + 1, 0)
1093 bufio.seek(pos - 1, 0)
1094 bufio.seek(pos, 0)
1095 self.check_writes(_seekabs)
1096 def _seekrel(bufio):
1097 pos = bufio.seek(0, 1)
1098 bufio.seek(+1, 1)
1099 bufio.seek(-1, 1)
1100 bufio.seek(pos, 0)
1101 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_writes_and_truncates(self):
1104 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def test_write_non_blocking(self):
1107 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001108 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001109
Ezio Melottib3aedd42010-11-20 19:04:17 +00001110 self.assertEqual(bufio.write(b"abcd"), 4)
1111 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 # 1 byte will be written, the rest will be buffered
1113 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001114 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1117 raw.block_on(b"0")
1118 try:
1119 bufio.write(b"opqrwxyz0123456789")
1120 except self.BlockingIOError as e:
1121 written = e.characters_written
1122 else:
1123 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001124 self.assertEqual(written, 16)
1125 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001127
Ezio Melottib3aedd42010-11-20 19:04:17 +00001128 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 s = raw.pop_written()
1130 # Previously buffered bytes were flushed
1131 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 def test_write_and_rewind(self):
1134 raw = io.BytesIO()
1135 bufio = self.tp(raw, 4)
1136 self.assertEqual(bufio.write(b"abcdef"), 6)
1137 self.assertEqual(bufio.tell(), 6)
1138 bufio.seek(0, 0)
1139 self.assertEqual(bufio.write(b"XY"), 2)
1140 bufio.seek(6, 0)
1141 self.assertEqual(raw.getvalue(), b"XYcdef")
1142 self.assertEqual(bufio.write(b"123456"), 6)
1143 bufio.flush()
1144 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_flush(self):
1147 writer = self.MockRawIO()
1148 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001149 bufio.write(b"abc")
1150 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001151 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 def test_destructor(self):
1154 writer = self.MockRawIO()
1155 bufio = self.tp(writer, 8)
1156 bufio.write(b"abc")
1157 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001158 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001159 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160
1161 def test_truncate(self):
1162 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001163 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 bufio = self.tp(raw, 8)
1165 bufio.write(b"abcdef")
1166 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001167 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001168 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 self.assertEqual(f.read(), b"abc")
1170
Victor Stinner45df8202010-04-28 22:31:17 +00001171 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001172 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001174 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 # Write out many bytes from many threads and test they were
1176 # all flushed.
1177 N = 1000
1178 contents = bytes(range(256)) * N
1179 sizes = cycle([1, 19])
1180 n = 0
1181 queue = deque()
1182 while n < len(contents):
1183 size = next(sizes)
1184 queue.append(contents[n:n+size])
1185 n += size
1186 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001187 # We use a real file object because it allows us to
1188 # exercise situations where the GIL is released before
1189 # writing the buffer to the raw streams. This is in addition
1190 # to concurrency issues due to switching threads in the middle
1191 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001192 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001194 errors = []
1195 def f():
1196 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197 while True:
1198 try:
1199 s = queue.popleft()
1200 except IndexError:
1201 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001202 bufio.write(s)
1203 except Exception as e:
1204 errors.append(e)
1205 raise
1206 threads = [threading.Thread(target=f) for x in range(20)]
1207 for t in threads:
1208 t.start()
1209 time.sleep(0.02) # yield
1210 for t in threads:
1211 t.join()
1212 self.assertFalse(errors,
1213 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001215 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001216 s = f.read()
1217 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001218 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001219 finally:
1220 support.unlink(support.TESTFN)
1221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222 def test_misbehaved_io(self):
1223 rawio = self.MisbehavedRawIO()
1224 bufio = self.tp(rawio, 5)
1225 self.assertRaises(IOError, bufio.seek, 0)
1226 self.assertRaises(IOError, bufio.tell)
1227 self.assertRaises(IOError, bufio.write, b"abcdef")
1228
Benjamin Peterson59406a92009-03-26 17:10:29 +00001229 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001230 with support.check_warnings(("max_buffer_size is deprecated",
1231 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001232 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001233
1234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235class CBufferedWriterTest(BufferedWriterTest):
1236 tp = io.BufferedWriter
1237
1238 def test_constructor(self):
1239 BufferedWriterTest.test_constructor(self)
1240 # The allocation can succeed on 32-bit builds, e.g. with more
1241 # than 2GB RAM and a 64-bit kernel.
1242 if sys.maxsize > 0x7FFFFFFF:
1243 rawio = self.MockRawIO()
1244 bufio = self.tp(rawio)
1245 self.assertRaises((OverflowError, MemoryError, ValueError),
1246 bufio.__init__, rawio, sys.maxsize)
1247
1248 def test_initialization(self):
1249 rawio = self.MockRawIO()
1250 bufio = self.tp(rawio)
1251 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1252 self.assertRaises(ValueError, bufio.write, b"def")
1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1254 self.assertRaises(ValueError, bufio.write, b"def")
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1256 self.assertRaises(ValueError, bufio.write, b"def")
1257
1258 def test_garbage_collection(self):
1259 # C BufferedWriter objects are collected, and collecting them flushes
1260 # all data to disk.
1261 # The Python version has __del__, so it ends into gc.garbage instead
1262 rawio = self.FileIO(support.TESTFN, "w+b")
1263 f = self.tp(rawio)
1264 f.write(b"123xxx")
1265 f.x = f
1266 wr = weakref.ref(f)
1267 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001268 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001269 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001270 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 self.assertEqual(f.read(), b"123xxx")
1272
1273
1274class PyBufferedWriterTest(BufferedWriterTest):
1275 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001276
Guido van Rossum01a27522007-03-07 01:00:12 +00001277class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001278
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001279 def test_constructor(self):
1280 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001281 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001282
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001283 def test_detach(self):
1284 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1285 self.assertRaises(self.UnsupportedOperation, pair.detach)
1286
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001287 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001288 with support.check_warnings(("max_buffer_size is deprecated",
1289 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001290 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001291
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001292 def test_constructor_with_not_readable(self):
1293 class NotReadable(MockRawIO):
1294 def readable(self):
1295 return False
1296
1297 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1298
1299 def test_constructor_with_not_writeable(self):
1300 class NotWriteable(MockRawIO):
1301 def writable(self):
1302 return False
1303
1304 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1305
1306 def test_read(self):
1307 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1308
1309 self.assertEqual(pair.read(3), b"abc")
1310 self.assertEqual(pair.read(1), b"d")
1311 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001312 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1313 self.assertEqual(pair.read(None), b"abc")
1314
1315 def test_readlines(self):
1316 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1317 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1318 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1319 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001320
1321 def test_read1(self):
1322 # .read1() is delegated to the underlying reader object, so this test
1323 # can be shallow.
1324 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1325
1326 self.assertEqual(pair.read1(3), b"abc")
1327
1328 def test_readinto(self):
1329 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1330
1331 data = bytearray(5)
1332 self.assertEqual(pair.readinto(data), 5)
1333 self.assertEqual(data, b"abcde")
1334
1335 def test_write(self):
1336 w = self.MockRawIO()
1337 pair = self.tp(self.MockRawIO(), w)
1338
1339 pair.write(b"abc")
1340 pair.flush()
1341 pair.write(b"def")
1342 pair.flush()
1343 self.assertEqual(w._write_stack, [b"abc", b"def"])
1344
1345 def test_peek(self):
1346 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1347
1348 self.assertTrue(pair.peek(3).startswith(b"abc"))
1349 self.assertEqual(pair.read(3), b"abc")
1350
1351 def test_readable(self):
1352 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1353 self.assertTrue(pair.readable())
1354
1355 def test_writeable(self):
1356 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1357 self.assertTrue(pair.writable())
1358
1359 def test_seekable(self):
1360 # BufferedRWPairs are never seekable, even if their readers and writers
1361 # are.
1362 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1363 self.assertFalse(pair.seekable())
1364
1365 # .flush() is delegated to the underlying writer object and has been
1366 # tested in the test_write method.
1367
1368 def test_close_and_closed(self):
1369 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1370 self.assertFalse(pair.closed)
1371 pair.close()
1372 self.assertTrue(pair.closed)
1373
1374 def test_isatty(self):
1375 class SelectableIsAtty(MockRawIO):
1376 def __init__(self, isatty):
1377 MockRawIO.__init__(self)
1378 self._isatty = isatty
1379
1380 def isatty(self):
1381 return self._isatty
1382
1383 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1384 self.assertFalse(pair.isatty())
1385
1386 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1387 self.assertTrue(pair.isatty())
1388
1389 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1390 self.assertTrue(pair.isatty())
1391
1392 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1393 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001394
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001395class CBufferedRWPairTest(BufferedRWPairTest):
1396 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001398class PyBufferedRWPairTest(BufferedRWPairTest):
1399 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401
1402class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1403 read_mode = "rb+"
1404 write_mode = "wb+"
1405
1406 def test_constructor(self):
1407 BufferedReaderTest.test_constructor(self)
1408 BufferedWriterTest.test_constructor(self)
1409
1410 def test_read_and_write(self):
1411 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001412 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001413
1414 self.assertEqual(b"as", rw.read(2))
1415 rw.write(b"ddd")
1416 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001417 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001418 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001419 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001420
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001421 def test_seek_and_tell(self):
1422 raw = self.BytesIO(b"asdfghjkl")
1423 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001424
Ezio Melottib3aedd42010-11-20 19:04:17 +00001425 self.assertEqual(b"as", rw.read(2))
1426 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001427 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001429
Antoine Pitroue05565e2011-08-20 14:39:23 +02001430 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001431 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001432 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001433 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001434 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001436 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(7, rw.tell())
1438 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001439 rw.flush()
1440 self.assertEqual(b"asdf123fl", raw.getvalue())
1441
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001442 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def check_flush_and_read(self, read_func):
1445 raw = self.BytesIO(b"abcdefghi")
1446 bufio = self.tp(raw)
1447
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001450 self.assertEqual(b"ef", read_func(bufio, 2))
1451 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001453 self.assertEqual(6, bufio.tell())
1454 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455 raw.seek(0, 0)
1456 raw.write(b"XYZ")
1457 # flush() resets the read buffer
1458 bufio.flush()
1459 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001461
1462 def test_flush_and_read(self):
1463 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1464
1465 def test_flush_and_readinto(self):
1466 def _readinto(bufio, n=-1):
1467 b = bytearray(n if n >= 0 else 9999)
1468 n = bufio.readinto(b)
1469 return bytes(b[:n])
1470 self.check_flush_and_read(_readinto)
1471
1472 def test_flush_and_peek(self):
1473 def _peek(bufio, n=-1):
1474 # This relies on the fact that the buffer can contain the whole
1475 # raw stream, otherwise peek() can return less.
1476 b = bufio.peek(n)
1477 if n != -1:
1478 b = b[:n]
1479 bufio.seek(len(b), 1)
1480 return b
1481 self.check_flush_and_read(_peek)
1482
1483 def test_flush_and_write(self):
1484 raw = self.BytesIO(b"abcdefghi")
1485 bufio = self.tp(raw)
1486
1487 bufio.write(b"123")
1488 bufio.flush()
1489 bufio.write(b"45")
1490 bufio.flush()
1491 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001492 self.assertEqual(b"12345fghi", raw.getvalue())
1493 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494
1495 def test_threads(self):
1496 BufferedReaderTest.test_threads(self)
1497 BufferedWriterTest.test_threads(self)
1498
1499 def test_writes_and_peek(self):
1500 def _peek(bufio):
1501 bufio.peek(1)
1502 self.check_writes(_peek)
1503 def _peek(bufio):
1504 pos = bufio.tell()
1505 bufio.seek(-1, 1)
1506 bufio.peek(1)
1507 bufio.seek(pos, 0)
1508 self.check_writes(_peek)
1509
1510 def test_writes_and_reads(self):
1511 def _read(bufio):
1512 bufio.seek(-1, 1)
1513 bufio.read(1)
1514 self.check_writes(_read)
1515
1516 def test_writes_and_read1s(self):
1517 def _read1(bufio):
1518 bufio.seek(-1, 1)
1519 bufio.read1(1)
1520 self.check_writes(_read1)
1521
1522 def test_writes_and_readintos(self):
1523 def _read(bufio):
1524 bufio.seek(-1, 1)
1525 bufio.readinto(bytearray(1))
1526 self.check_writes(_read)
1527
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001528 def test_write_after_readahead(self):
1529 # Issue #6629: writing after the buffer was filled by readahead should
1530 # first rewind the raw stream.
1531 for overwrite_size in [1, 5]:
1532 raw = self.BytesIO(b"A" * 10)
1533 bufio = self.tp(raw, 4)
1534 # Trigger readahead
1535 self.assertEqual(bufio.read(1), b"A")
1536 self.assertEqual(bufio.tell(), 1)
1537 # Overwriting should rewind the raw stream if it needs so
1538 bufio.write(b"B" * overwrite_size)
1539 self.assertEqual(bufio.tell(), overwrite_size + 1)
1540 # If the write size was smaller than the buffer size, flush() and
1541 # check that rewind happens.
1542 bufio.flush()
1543 self.assertEqual(bufio.tell(), overwrite_size + 1)
1544 s = raw.getvalue()
1545 self.assertEqual(s,
1546 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1547
Antoine Pitrou7c404892011-05-13 00:13:33 +02001548 def test_write_rewind_write(self):
1549 # Various combinations of reading / writing / seeking backwards / writing again
1550 def mutate(bufio, pos1, pos2):
1551 assert pos2 >= pos1
1552 # Fill the buffer
1553 bufio.seek(pos1)
1554 bufio.read(pos2 - pos1)
1555 bufio.write(b'\x02')
1556 # This writes earlier than the previous write, but still inside
1557 # the buffer.
1558 bufio.seek(pos1)
1559 bufio.write(b'\x01')
1560
1561 b = b"\x80\x81\x82\x83\x84"
1562 for i in range(0, len(b)):
1563 for j in range(i, len(b)):
1564 raw = self.BytesIO(b)
1565 bufio = self.tp(raw, 100)
1566 mutate(bufio, i, j)
1567 bufio.flush()
1568 expected = bytearray(b)
1569 expected[j] = 2
1570 expected[i] = 1
1571 self.assertEqual(raw.getvalue(), expected,
1572 "failed result for i=%d, j=%d" % (i, j))
1573
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001574 def test_truncate_after_read_or_write(self):
1575 raw = self.BytesIO(b"A" * 10)
1576 bufio = self.tp(raw, 100)
1577 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1578 self.assertEqual(bufio.truncate(), 2)
1579 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1580 self.assertEqual(bufio.truncate(), 4)
1581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582 def test_misbehaved_io(self):
1583 BufferedReaderTest.test_misbehaved_io(self)
1584 BufferedWriterTest.test_misbehaved_io(self)
1585
Antoine Pitroue05565e2011-08-20 14:39:23 +02001586 def test_interleaved_read_write(self):
1587 # Test for issue #12213
1588 with self.BytesIO(b'abcdefgh') as raw:
1589 with self.tp(raw, 100) as f:
1590 f.write(b"1")
1591 self.assertEqual(f.read(1), b'b')
1592 f.write(b'2')
1593 self.assertEqual(f.read1(1), b'd')
1594 f.write(b'3')
1595 buf = bytearray(1)
1596 f.readinto(buf)
1597 self.assertEqual(buf, b'f')
1598 f.write(b'4')
1599 self.assertEqual(f.peek(1), b'h')
1600 f.flush()
1601 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1602
1603 with self.BytesIO(b'abc') as raw:
1604 with self.tp(raw, 100) as f:
1605 self.assertEqual(f.read(1), b'a')
1606 f.write(b"2")
1607 self.assertEqual(f.read(1), b'c')
1608 f.flush()
1609 self.assertEqual(raw.getvalue(), b'a2c')
1610
1611 def test_interleaved_readline_write(self):
1612 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1613 with self.tp(raw) as f:
1614 f.write(b'1')
1615 self.assertEqual(f.readline(), b'b\n')
1616 f.write(b'2')
1617 self.assertEqual(f.readline(), b'def\n')
1618 f.write(b'3')
1619 self.assertEqual(f.readline(), b'\n')
1620 f.flush()
1621 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1622
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001623 # You can't construct a BufferedRandom over a non-seekable stream.
1624 test_unseekable = None
1625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626class CBufferedRandomTest(BufferedRandomTest):
1627 tp = io.BufferedRandom
1628
1629 def test_constructor(self):
1630 BufferedRandomTest.test_constructor(self)
1631 # The allocation can succeed on 32-bit builds, e.g. with more
1632 # than 2GB RAM and a 64-bit kernel.
1633 if sys.maxsize > 0x7FFFFFFF:
1634 rawio = self.MockRawIO()
1635 bufio = self.tp(rawio)
1636 self.assertRaises((OverflowError, MemoryError, ValueError),
1637 bufio.__init__, rawio, sys.maxsize)
1638
1639 def test_garbage_collection(self):
1640 CBufferedReaderTest.test_garbage_collection(self)
1641 CBufferedWriterTest.test_garbage_collection(self)
1642
1643class PyBufferedRandomTest(BufferedRandomTest):
1644 tp = pyio.BufferedRandom
1645
1646
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001647# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1648# properties:
1649# - A single output character can correspond to many bytes of input.
1650# - The number of input bytes to complete the character can be
1651# undetermined until the last input byte is received.
1652# - The number of input bytes can vary depending on previous input.
1653# - A single input byte can correspond to many characters of output.
1654# - The number of output characters can be undetermined until the
1655# last input byte is received.
1656# - The number of output characters can vary depending on previous input.
1657
1658class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1659 """
1660 For testing seek/tell behavior with a stateful, buffering decoder.
1661
1662 Input is a sequence of words. Words may be fixed-length (length set
1663 by input) or variable-length (period-terminated). In variable-length
1664 mode, extra periods are ignored. Possible words are:
1665 - 'i' followed by a number sets the input length, I (maximum 99).
1666 When I is set to 0, words are space-terminated.
1667 - 'o' followed by a number sets the output length, O (maximum 99).
1668 - Any other word is converted into a word followed by a period on
1669 the output. The output word consists of the input word truncated
1670 or padded out with hyphens to make its length equal to O. If O
1671 is 0, the word is output verbatim without truncating or padding.
1672 I and O are initially set to 1. When I changes, any buffered input is
1673 re-scanned according to the new I. EOF also terminates the last word.
1674 """
1675
1676 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001677 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001678 self.reset()
1679
1680 def __repr__(self):
1681 return '<SID %x>' % id(self)
1682
1683 def reset(self):
1684 self.i = 1
1685 self.o = 1
1686 self.buffer = bytearray()
1687
1688 def getstate(self):
1689 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1690 return bytes(self.buffer), i*100 + o
1691
1692 def setstate(self, state):
1693 buffer, io = state
1694 self.buffer = bytearray(buffer)
1695 i, o = divmod(io, 100)
1696 self.i, self.o = i ^ 1, o ^ 1
1697
1698 def decode(self, input, final=False):
1699 output = ''
1700 for b in input:
1701 if self.i == 0: # variable-length, terminated with period
1702 if b == ord('.'):
1703 if self.buffer:
1704 output += self.process_word()
1705 else:
1706 self.buffer.append(b)
1707 else: # fixed-length, terminate after self.i bytes
1708 self.buffer.append(b)
1709 if len(self.buffer) == self.i:
1710 output += self.process_word()
1711 if final and self.buffer: # EOF terminates the last word
1712 output += self.process_word()
1713 return output
1714
1715 def process_word(self):
1716 output = ''
1717 if self.buffer[0] == ord('i'):
1718 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1719 elif self.buffer[0] == ord('o'):
1720 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1721 else:
1722 output = self.buffer.decode('ascii')
1723 if len(output) < self.o:
1724 output += '-'*self.o # pad out with hyphens
1725 if self.o:
1726 output = output[:self.o] # truncate to output length
1727 output += '.'
1728 self.buffer = bytearray()
1729 return output
1730
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001731 codecEnabled = False
1732
1733 @classmethod
1734 def lookupTestDecoder(cls, name):
1735 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001736 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001737 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001738 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001739 incrementalencoder=None,
1740 streamreader=None, streamwriter=None,
1741 incrementaldecoder=cls)
1742
1743# Register the previous decoder for testing.
1744# Disabled by default, tests will enable it.
1745codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1746
1747
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001748class StatefulIncrementalDecoderTest(unittest.TestCase):
1749 """
1750 Make sure the StatefulIncrementalDecoder actually works.
1751 """
1752
1753 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001754 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001755 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001756 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001757 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001758 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001759 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001760 # I=0, O=6 (variable-length input, fixed-length output)
1761 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1762 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001763 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001764 # I=6, O=3 (fixed-length input > fixed-length output)
1765 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1766 # I=0, then 3; O=29, then 15 (with longer output)
1767 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1768 'a----------------------------.' +
1769 'b----------------------------.' +
1770 'cde--------------------------.' +
1771 'abcdefghijabcde.' +
1772 'a.b------------.' +
1773 '.c.------------.' +
1774 'd.e------------.' +
1775 'k--------------.' +
1776 'l--------------.' +
1777 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001778 ]
1779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001781 # Try a few one-shot test cases.
1782 for input, eof, output in self.test_cases:
1783 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001784 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001785
1786 # Also test an unfinished decode, followed by forcing EOF.
1787 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001788 self.assertEqual(d.decode(b'oiabcd'), '')
1789 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001790
1791class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001792
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001793 def setUp(self):
1794 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1795 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001796 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001797
Guido van Rossumd0712812007-04-11 16:32:43 +00001798 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001799 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001800
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 def test_constructor(self):
1802 r = self.BytesIO(b"\xc3\xa9\n\n")
1803 b = self.BufferedReader(r, 1000)
1804 t = self.TextIOWrapper(b)
1805 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001806 self.assertEqual(t.encoding, "latin1")
1807 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001809 self.assertEqual(t.encoding, "utf8")
1810 self.assertEqual(t.line_buffering, True)
1811 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812 self.assertRaises(TypeError, t.__init__, b, newline=42)
1813 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1814
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001815 def test_detach(self):
1816 r = self.BytesIO()
1817 b = self.BufferedWriter(r)
1818 t = self.TextIOWrapper(b)
1819 self.assertIs(t.detach(), b)
1820
1821 t = self.TextIOWrapper(b, encoding="ascii")
1822 t.write("howdy")
1823 self.assertFalse(r.getvalue())
1824 t.detach()
1825 self.assertEqual(r.getvalue(), b"howdy")
1826 self.assertRaises(ValueError, t.detach)
1827
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001828 def test_repr(self):
1829 raw = self.BytesIO("hello".encode("utf-8"))
1830 b = self.BufferedReader(raw)
1831 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001832 modname = self.TextIOWrapper.__module__
1833 self.assertEqual(repr(t),
1834 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1835 raw.name = "dummy"
1836 self.assertEqual(repr(t),
1837 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001838 t.mode = "r"
1839 self.assertEqual(repr(t),
1840 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001841 raw.name = b"dummy"
1842 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001843 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 def test_line_buffering(self):
1846 r = self.BytesIO()
1847 b = self.BufferedWriter(r, 1000)
1848 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001849 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001850 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001851 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001852 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001853 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001854 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 def test_encoding(self):
1857 # Check the encoding attribute is always set, and valid
1858 b = self.BytesIO()
1859 t = self.TextIOWrapper(b, encoding="utf8")
1860 self.assertEqual(t.encoding, "utf8")
1861 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001862 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 codecs.lookup(t.encoding)
1864
1865 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001866 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 b = self.BytesIO(b"abc\n\xff\n")
1868 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001869 self.assertRaises(UnicodeError, t.read)
1870 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 b = self.BytesIO(b"abc\n\xff\n")
1872 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001873 self.assertRaises(UnicodeError, t.read)
1874 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 b = self.BytesIO(b"abc\n\xff\n")
1876 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001877 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001878 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 b = self.BytesIO(b"abc\n\xff\n")
1880 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001881 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001884 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 b = self.BytesIO()
1886 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001887 self.assertRaises(UnicodeError, t.write, "\xff")
1888 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 b = self.BytesIO()
1890 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001891 self.assertRaises(UnicodeError, t.write, "\xff")
1892 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 b = self.BytesIO()
1894 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001895 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001896 t.write("abc\xffdef\n")
1897 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001898 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001899 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 b = self.BytesIO()
1901 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001902 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001903 t.write("abc\xffdef\n")
1904 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001905 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001908 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1909
1910 tests = [
1911 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001912 [ '', input_lines ],
1913 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1914 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1915 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001916 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001917 encodings = (
1918 'utf-8', 'latin-1',
1919 'utf-16', 'utf-16-le', 'utf-16-be',
1920 'utf-32', 'utf-32-le', 'utf-32-be',
1921 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001922
Guido van Rossum8358db22007-08-18 21:39:55 +00001923 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001924 # character in TextIOWrapper._pending_line.
1925 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001926 # XXX: str.encode() should return bytes
1927 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001928 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001929 for bufsize in range(1, 10):
1930 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1932 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001933 encoding=encoding)
1934 if do_reads:
1935 got_lines = []
1936 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001937 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001938 if c2 == '':
1939 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001940 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001941 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001942 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001943 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001944
1945 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001946 self.assertEqual(got_line, exp_line)
1947 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001949 def test_newlines_input(self):
1950 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001951 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1952 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001953 (None, normalized.decode("ascii").splitlines(True)),
1954 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1956 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1957 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001958 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 buf = self.BytesIO(testdata)
1960 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001961 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001962 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 def test_newlines_output(self):
1966 testdict = {
1967 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1968 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1969 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1970 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1971 }
1972 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1973 for newline, expected in tests:
1974 buf = self.BytesIO()
1975 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1976 txt.write("AAA\nB")
1977 txt.write("BB\nCCC\n")
1978 txt.write("X\rY\r\nZ")
1979 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual(buf.closed, False)
1981 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982
1983 def test_destructor(self):
1984 l = []
1985 base = self.BytesIO
1986 class MyBytesIO(base):
1987 def close(self):
1988 l.append(self.getvalue())
1989 base.close(self)
1990 b = MyBytesIO()
1991 t = self.TextIOWrapper(b, encoding="ascii")
1992 t.write("abc")
1993 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001994 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996
1997 def test_override_destructor(self):
1998 record = []
1999 class MyTextIO(self.TextIOWrapper):
2000 def __del__(self):
2001 record.append(1)
2002 try:
2003 f = super().__del__
2004 except AttributeError:
2005 pass
2006 else:
2007 f()
2008 def close(self):
2009 record.append(2)
2010 super().close()
2011 def flush(self):
2012 record.append(3)
2013 super().flush()
2014 b = self.BytesIO()
2015 t = MyTextIO(b, encoding="ascii")
2016 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002017 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 self.assertEqual(record, [1, 2, 3])
2019
2020 def test_error_through_destructor(self):
2021 # Test that the exception state is not modified by a destructor,
2022 # even if close() fails.
2023 rawio = self.CloseFailureIO()
2024 def f():
2025 self.TextIOWrapper(rawio).xyzzy
2026 with support.captured_output("stderr") as s:
2027 self.assertRaises(AttributeError, f)
2028 s = s.getvalue().strip()
2029 if s:
2030 # The destructor *may* have printed an unraisable error, check it
2031 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002032 self.assertTrue(s.startswith("Exception IOError: "), s)
2033 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002034
Guido van Rossum9b76da62007-04-11 01:09:03 +00002035 # Systematic tests of the text I/O API
2036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002038 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2039 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002041 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002042 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002043 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002045 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002046 self.assertEqual(f.tell(), 0)
2047 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002048 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002049 self.assertEqual(f.seek(0), 0)
2050 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002051 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002052 self.assertEqual(f.read(2), "ab")
2053 self.assertEqual(f.read(1), "c")
2054 self.assertEqual(f.read(1), "")
2055 self.assertEqual(f.read(), "")
2056 self.assertEqual(f.tell(), cookie)
2057 self.assertEqual(f.seek(0), 0)
2058 self.assertEqual(f.seek(0, 2), cookie)
2059 self.assertEqual(f.write("def"), 3)
2060 self.assertEqual(f.seek(cookie), cookie)
2061 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002062 if enc.startswith("utf"):
2063 self.multi_line_test(f, enc)
2064 f.close()
2065
2066 def multi_line_test(self, f, enc):
2067 f.seek(0)
2068 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002069 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002070 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002071 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002072 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002073 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002074 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002075 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002076 wlines.append((f.tell(), line))
2077 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002078 f.seek(0)
2079 rlines = []
2080 while True:
2081 pos = f.tell()
2082 line = f.readline()
2083 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002084 break
2085 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002086 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002088 def test_telling(self):
2089 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002090 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002091 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002092 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002093 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002094 p2 = f.tell()
2095 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(f.tell(), p0)
2097 self.assertEqual(f.readline(), "\xff\n")
2098 self.assertEqual(f.tell(), p1)
2099 self.assertEqual(f.readline(), "\xff\n")
2100 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002101 f.seek(0)
2102 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002103 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002104 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002106 f.close()
2107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 def test_seeking(self):
2109 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002110 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002111 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002112 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002114 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002115 suffix = bytes(u_suffix.encode("utf-8"))
2116 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002117 with self.open(support.TESTFN, "wb") as f:
2118 f.write(line*2)
2119 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2120 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002121 self.assertEqual(s, str(prefix, "ascii"))
2122 self.assertEqual(f.tell(), prefix_size)
2123 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002126 # Regression test for a specific bug
2127 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002128 with self.open(support.TESTFN, "wb") as f:
2129 f.write(data)
2130 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2131 f._CHUNK_SIZE # Just test that it exists
2132 f._CHUNK_SIZE = 2
2133 f.readline()
2134 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 def test_seek_and_tell(self):
2137 #Test seek/tell using the StatefulIncrementalDecoder.
2138 # Make test faster by doing smaller seeks
2139 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002140
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002141 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002142 """Tell/seek to various points within a data stream and ensure
2143 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002145 f.write(data)
2146 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 f = self.open(support.TESTFN, encoding='test_decoder')
2148 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002149 decoded = f.read()
2150 f.close()
2151
Neal Norwitze2b07052008-03-18 19:52:05 +00002152 for i in range(min_pos, len(decoded) + 1): # seek positions
2153 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002155 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002156 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002157 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002158 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002159 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002160 f.close()
2161
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002162 # Enable the test decoder.
2163 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002164
2165 # Run the tests.
2166 try:
2167 # Try each test case.
2168 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002169 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002170
2171 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002172 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2173 offset = CHUNK_SIZE - len(input)//2
2174 prefix = b'.'*offset
2175 # Don't bother seeking into the prefix (takes too long).
2176 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002177 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002178
2179 # Ensure our test decoder won't interfere with subsequent tests.
2180 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002181 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002184 data = "1234567890"
2185 tests = ("utf-16",
2186 "utf-16-le",
2187 "utf-16-be",
2188 "utf-32",
2189 "utf-32-le",
2190 "utf-32-be")
2191 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 buf = self.BytesIO()
2193 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002194 # Check if the BOM is written only once (see issue1753).
2195 f.write(data)
2196 f.write(data)
2197 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002199 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002200 self.assertEqual(f.read(), data * 2)
2201 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002202
Benjamin Petersona1b49012009-03-31 23:11:32 +00002203 def test_unreadable(self):
2204 class UnReadable(self.BytesIO):
2205 def readable(self):
2206 return False
2207 txt = self.TextIOWrapper(UnReadable())
2208 self.assertRaises(IOError, txt.read)
2209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 def test_read_one_by_one(self):
2211 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002212 reads = ""
2213 while True:
2214 c = txt.read(1)
2215 if not c:
2216 break
2217 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002219
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002220 def test_readlines(self):
2221 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2222 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2223 txt.seek(0)
2224 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2225 txt.seek(0)
2226 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2227
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002228 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002229 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002230 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002232 reads = ""
2233 while True:
2234 c = txt.read(128)
2235 if not c:
2236 break
2237 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002239
2240 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002242
2243 # read one char at a time
2244 reads = ""
2245 while True:
2246 c = txt.read(1)
2247 if not c:
2248 break
2249 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002250 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002251
2252 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002254 txt._CHUNK_SIZE = 4
2255
2256 reads = ""
2257 while True:
2258 c = txt.read(4)
2259 if not c:
2260 break
2261 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002262 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002263
2264 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002265 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002266 txt._CHUNK_SIZE = 4
2267
2268 reads = txt.read(4)
2269 reads += txt.read(4)
2270 reads += txt.readline()
2271 reads += txt.readline()
2272 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002273 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002274
2275 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002276 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002277 txt._CHUNK_SIZE = 4
2278
2279 reads = txt.read(4)
2280 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002281 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002282
2283 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002285 txt._CHUNK_SIZE = 4
2286
2287 reads = txt.read(4)
2288 pos = txt.tell()
2289 txt.seek(0)
2290 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002291 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002292
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002293 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002294 buffer = self.BytesIO(self.testdata)
2295 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002296
2297 self.assertEqual(buffer.seekable(), txt.seekable())
2298
Antoine Pitroue4501852009-05-14 18:55:55 +00002299 def test_append_bom(self):
2300 # The BOM is not written again when appending to a non-empty file
2301 filename = support.TESTFN
2302 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2303 with self.open(filename, 'w', encoding=charset) as f:
2304 f.write('aaa')
2305 pos = f.tell()
2306 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002307 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002308
2309 with self.open(filename, 'a', encoding=charset) as f:
2310 f.write('xxx')
2311 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002313
2314 def test_seek_bom(self):
2315 # Same test, but when seeking manually
2316 filename = support.TESTFN
2317 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2318 with self.open(filename, 'w', encoding=charset) as f:
2319 f.write('aaa')
2320 pos = f.tell()
2321 with self.open(filename, 'r+', encoding=charset) as f:
2322 f.seek(pos)
2323 f.write('zzz')
2324 f.seek(0)
2325 f.write('bbb')
2326 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002327 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002328
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002329 def test_errors_property(self):
2330 with self.open(support.TESTFN, "w") as f:
2331 self.assertEqual(f.errors, "strict")
2332 with self.open(support.TESTFN, "w", errors="replace") as f:
2333 self.assertEqual(f.errors, "replace")
2334
Victor Stinner45df8202010-04-28 22:31:17 +00002335 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002336 def test_threads_write(self):
2337 # Issue6750: concurrent writes could duplicate data
2338 event = threading.Event()
2339 with self.open(support.TESTFN, "w", buffering=1) as f:
2340 def run(n):
2341 text = "Thread%03d\n" % n
2342 event.wait()
2343 f.write(text)
2344 threads = [threading.Thread(target=lambda n=x: run(n))
2345 for x in range(20)]
2346 for t in threads:
2347 t.start()
2348 time.sleep(0.02)
2349 event.set()
2350 for t in threads:
2351 t.join()
2352 with self.open(support.TESTFN) as f:
2353 content = f.read()
2354 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002355 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002356
Antoine Pitrou6be88762010-05-03 16:48:20 +00002357 def test_flush_error_on_close(self):
2358 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2359 def bad_flush():
2360 raise IOError()
2361 txt.flush = bad_flush
2362 self.assertRaises(IOError, txt.close) # exception not swallowed
2363
2364 def test_multi_close(self):
2365 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2366 txt.close()
2367 txt.close()
2368 txt.close()
2369 self.assertRaises(ValueError, txt.flush)
2370
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002371 def test_unseekable(self):
2372 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2373 self.assertRaises(self.UnsupportedOperation, txt.tell)
2374 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2375
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002376 def test_readonly_attributes(self):
2377 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2378 buf = self.BytesIO(self.testdata)
2379 with self.assertRaises(AttributeError):
2380 txt.buffer = buf
2381
Antoine Pitroue96ec682011-07-23 21:46:35 +02002382 def test_rawio(self):
2383 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2384 # that subprocess.Popen() can have the required unbuffered
2385 # semantics with universal_newlines=True.
2386 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2387 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2388 # Reads
2389 self.assertEqual(txt.read(4), 'abcd')
2390 self.assertEqual(txt.readline(), 'efghi\n')
2391 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2392
2393 def test_rawio_write_through(self):
2394 # Issue #12591: with write_through=True, writes don't need a flush
2395 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2396 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2397 write_through=True)
2398 txt.write('1')
2399 txt.write('23\n4')
2400 txt.write('5')
2401 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002403class CTextIOWrapperTest(TextIOWrapperTest):
2404
2405 def test_initialization(self):
2406 r = self.BytesIO(b"\xc3\xa9\n\n")
2407 b = self.BufferedReader(r, 1000)
2408 t = self.TextIOWrapper(b)
2409 self.assertRaises(TypeError, t.__init__, b, newline=42)
2410 self.assertRaises(ValueError, t.read)
2411 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2412 self.assertRaises(ValueError, t.read)
2413
2414 def test_garbage_collection(self):
2415 # C TextIOWrapper objects are collected, and collecting them flushes
2416 # all data to disk.
2417 # The Python version has __del__, so it ends in gc.garbage instead.
2418 rawio = io.FileIO(support.TESTFN, "wb")
2419 b = self.BufferedWriter(rawio)
2420 t = self.TextIOWrapper(b, encoding="ascii")
2421 t.write("456def")
2422 t.x = t
2423 wr = weakref.ref(t)
2424 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002425 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002426 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002427 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 self.assertEqual(f.read(), b"456def")
2429
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002430 def test_rwpair_cleared_before_textio(self):
2431 # Issue 13070: TextIOWrapper's finalization would crash when called
2432 # after the reference to the underlying BufferedRWPair's writer got
2433 # cleared by the GC.
2434 for i in range(1000):
2435 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2436 t1 = self.TextIOWrapper(b1, encoding="ascii")
2437 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2438 t2 = self.TextIOWrapper(b2, encoding="ascii")
2439 # circular references
2440 t1.buddy = t2
2441 t2.buddy = t1
2442 support.gc_collect()
2443
2444
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002445class PyTextIOWrapperTest(TextIOWrapperTest):
2446 pass
2447
2448
2449class IncrementalNewlineDecoderTest(unittest.TestCase):
2450
2451 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002452 # UTF-8 specific tests for a newline decoder
2453 def _check_decode(b, s, **kwargs):
2454 # We exercise getstate() / setstate() as well as decode()
2455 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002456 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002457 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459
Antoine Pitrou180a3362008-12-14 16:36:46 +00002460 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002461
Antoine Pitrou180a3362008-12-14 16:36:46 +00002462 _check_decode(b'\xe8', "")
2463 _check_decode(b'\xa2', "")
2464 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002465
Antoine Pitrou180a3362008-12-14 16:36:46 +00002466 _check_decode(b'\xe8', "")
2467 _check_decode(b'\xa2', "")
2468 _check_decode(b'\x88', "\u8888")
2469
2470 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002471 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2472
Antoine Pitrou180a3362008-12-14 16:36:46 +00002473 decoder.reset()
2474 _check_decode(b'\n', "\n")
2475 _check_decode(b'\r', "")
2476 _check_decode(b'', "\n", final=True)
2477 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002478
Antoine Pitrou180a3362008-12-14 16:36:46 +00002479 _check_decode(b'\r', "")
2480 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002481
Antoine Pitrou180a3362008-12-14 16:36:46 +00002482 _check_decode(b'\r\r\n', "\n\n")
2483 _check_decode(b'\r', "")
2484 _check_decode(b'\r', "\n")
2485 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002486
Antoine Pitrou180a3362008-12-14 16:36:46 +00002487 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2488 _check_decode(b'\xe8\xa2\x88', "\u8888")
2489 _check_decode(b'\n', "\n")
2490 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2491 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002493 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002494 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002495 if encoding is not None:
2496 encoder = codecs.getincrementalencoder(encoding)()
2497 def _decode_bytewise(s):
2498 # Decode one byte at a time
2499 for b in encoder.encode(s):
2500 result.append(decoder.decode(bytes([b])))
2501 else:
2502 encoder = None
2503 def _decode_bytewise(s):
2504 # Decode one char at a time
2505 for c in s:
2506 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002507 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002508 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002509 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002510 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002511 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002512 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002513 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002514 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002515 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002516 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002517 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002518 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519 input = "abc"
2520 if encoder is not None:
2521 encoder.reset()
2522 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002523 self.assertEqual(decoder.decode(input), "abc")
2524 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525
2526 def test_newline_decoder(self):
2527 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002528 # None meaning the IncrementalNewlineDecoder takes unicode input
2529 # rather than bytes input
2530 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002531 'utf-16', 'utf-16-le', 'utf-16-be',
2532 'utf-32', 'utf-32-le', 'utf-32-be',
2533 )
2534 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002535 decoder = enc and codecs.getincrementaldecoder(enc)()
2536 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2537 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002538 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2540 self.check_newline_decoding_utf8(decoder)
2541
Antoine Pitrou66913e22009-03-06 23:40:56 +00002542 def test_newline_bytes(self):
2543 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2544 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002545 self.assertEqual(dec.newlines, None)
2546 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2547 self.assertEqual(dec.newlines, None)
2548 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2549 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002550 dec = self.IncrementalNewlineDecoder(None, translate=False)
2551 _check(dec)
2552 dec = self.IncrementalNewlineDecoder(None, translate=True)
2553 _check(dec)
2554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002555class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2556 pass
2557
2558class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2559 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002560
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002561
Guido van Rossum01a27522007-03-07 01:00:12 +00002562# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002563
Guido van Rossum5abbf752007-08-27 17:39:33 +00002564class MiscIOTest(unittest.TestCase):
2565
Barry Warsaw40e82462008-11-20 20:14:50 +00002566 def tearDown(self):
2567 support.unlink(support.TESTFN)
2568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 def test___all__(self):
2570 for name in self.io.__all__:
2571 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002572 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002573 if name == "open":
2574 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002575 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002576 self.assertTrue(issubclass(obj, Exception), name)
2577 elif not name.startswith("SEEK_"):
2578 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002579
Barry Warsaw40e82462008-11-20 20:14:50 +00002580 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002581 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002583 f.close()
2584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002586 self.assertEqual(f.name, support.TESTFN)
2587 self.assertEqual(f.buffer.name, support.TESTFN)
2588 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2589 self.assertEqual(f.mode, "U")
2590 self.assertEqual(f.buffer.mode, "rb")
2591 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002592 f.close()
2593
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(f.mode, "w+")
2596 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2597 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002598
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002599 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002600 self.assertEqual(g.mode, "wb")
2601 self.assertEqual(g.raw.mode, "wb")
2602 self.assertEqual(g.name, f.fileno())
2603 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002604 f.close()
2605 g.close()
2606
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002607 def test_io_after_close(self):
2608 for kwargs in [
2609 {"mode": "w"},
2610 {"mode": "wb"},
2611 {"mode": "w", "buffering": 1},
2612 {"mode": "w", "buffering": 2},
2613 {"mode": "wb", "buffering": 0},
2614 {"mode": "r"},
2615 {"mode": "rb"},
2616 {"mode": "r", "buffering": 1},
2617 {"mode": "r", "buffering": 2},
2618 {"mode": "rb", "buffering": 0},
2619 {"mode": "w+"},
2620 {"mode": "w+b"},
2621 {"mode": "w+", "buffering": 1},
2622 {"mode": "w+", "buffering": 2},
2623 {"mode": "w+b", "buffering": 0},
2624 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002625 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002626 f.close()
2627 self.assertRaises(ValueError, f.flush)
2628 self.assertRaises(ValueError, f.fileno)
2629 self.assertRaises(ValueError, f.isatty)
2630 self.assertRaises(ValueError, f.__iter__)
2631 if hasattr(f, "peek"):
2632 self.assertRaises(ValueError, f.peek, 1)
2633 self.assertRaises(ValueError, f.read)
2634 if hasattr(f, "read1"):
2635 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002636 if hasattr(f, "readall"):
2637 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002638 if hasattr(f, "readinto"):
2639 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2640 self.assertRaises(ValueError, f.readline)
2641 self.assertRaises(ValueError, f.readlines)
2642 self.assertRaises(ValueError, f.seek, 0)
2643 self.assertRaises(ValueError, f.tell)
2644 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 self.assertRaises(ValueError, f.write,
2646 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002647 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 def test_blockingioerror(self):
2651 # Various BlockingIOError issues
2652 self.assertRaises(TypeError, self.BlockingIOError)
2653 self.assertRaises(TypeError, self.BlockingIOError, 1)
2654 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2655 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2656 b = self.BlockingIOError(1, "")
2657 self.assertEqual(b.characters_written, 0)
2658 class C(str):
2659 pass
2660 c = C("")
2661 b = self.BlockingIOError(1, c)
2662 c.b = b
2663 b.c = c
2664 wr = weakref.ref(c)
2665 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002666 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002667 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002668
2669 def test_abcs(self):
2670 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002671 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2672 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2673 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2674 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675
2676 def _check_abc_inheritance(self, abcmodule):
2677 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002678 self.assertIsInstance(f, abcmodule.IOBase)
2679 self.assertIsInstance(f, abcmodule.RawIOBase)
2680 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2681 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002682 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002683 self.assertIsInstance(f, abcmodule.IOBase)
2684 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2685 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2686 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002687 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002688 self.assertIsInstance(f, abcmodule.IOBase)
2689 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2690 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2691 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692
2693 def test_abc_inheritance(self):
2694 # Test implementations inherit from their respective ABCs
2695 self._check_abc_inheritance(self)
2696
2697 def test_abc_inheritance_official(self):
2698 # Test implementations inherit from the official ABCs of the
2699 # baseline "io" module.
2700 self._check_abc_inheritance(io)
2701
Antoine Pitroue033e062010-10-29 10:38:18 +00002702 def _check_warn_on_dealloc(self, *args, **kwargs):
2703 f = open(*args, **kwargs)
2704 r = repr(f)
2705 with self.assertWarns(ResourceWarning) as cm:
2706 f = None
2707 support.gc_collect()
2708 self.assertIn(r, str(cm.warning.args[0]))
2709
2710 def test_warn_on_dealloc(self):
2711 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2712 self._check_warn_on_dealloc(support.TESTFN, "wb")
2713 self._check_warn_on_dealloc(support.TESTFN, "w")
2714
2715 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2716 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002717 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002718 for fd in fds:
2719 try:
2720 os.close(fd)
2721 except EnvironmentError as e:
2722 if e.errno != errno.EBADF:
2723 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002724 self.addCleanup(cleanup_fds)
2725 r, w = os.pipe()
2726 fds += r, w
2727 self._check_warn_on_dealloc(r, *args, **kwargs)
2728 # When using closefd=False, there's no warning
2729 r, w = os.pipe()
2730 fds += r, w
2731 with warnings.catch_warnings(record=True) as recorded:
2732 open(r, *args, closefd=False, **kwargs)
2733 support.gc_collect()
2734 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002735
2736 def test_warn_on_dealloc_fd(self):
2737 self._check_warn_on_dealloc_fd("rb", buffering=0)
2738 self._check_warn_on_dealloc_fd("rb")
2739 self._check_warn_on_dealloc_fd("r")
2740
2741
Antoine Pitrou243757e2010-11-05 21:15:39 +00002742 def test_pickling(self):
2743 # Pickling file objects is forbidden
2744 for kwargs in [
2745 {"mode": "w"},
2746 {"mode": "wb"},
2747 {"mode": "wb", "buffering": 0},
2748 {"mode": "r"},
2749 {"mode": "rb"},
2750 {"mode": "rb", "buffering": 0},
2751 {"mode": "w+"},
2752 {"mode": "w+b"},
2753 {"mode": "w+b", "buffering": 0},
2754 ]:
2755 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2756 with self.open(support.TESTFN, **kwargs) as f:
2757 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2758
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002759 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2760 def test_nonblock_pipe_write_bigbuf(self):
2761 self._test_nonblock_pipe_write(16*1024)
2762
2763 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2764 def test_nonblock_pipe_write_smallbuf(self):
2765 self._test_nonblock_pipe_write(1024)
2766
2767 def _set_non_blocking(self, fd):
2768 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2769 self.assertNotEqual(flags, -1)
2770 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2771 self.assertEqual(res, 0)
2772
2773 def _test_nonblock_pipe_write(self, bufsize):
2774 sent = []
2775 received = []
2776 r, w = os.pipe()
2777 self._set_non_blocking(r)
2778 self._set_non_blocking(w)
2779
2780 # To exercise all code paths in the C implementation we need
2781 # to play with buffer sizes. For instance, if we choose a
2782 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2783 # then we will never get a partial write of the buffer.
2784 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2785 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2786
2787 with rf, wf:
2788 for N in 9999, 73, 7574:
2789 try:
2790 i = 0
2791 while True:
2792 msg = bytes([i % 26 + 97]) * N
2793 sent.append(msg)
2794 wf.write(msg)
2795 i += 1
2796
2797 except self.BlockingIOError as e:
2798 self.assertEqual(e.args[0], errno.EAGAIN)
2799 sent[-1] = sent[-1][:e.characters_written]
2800 received.append(rf.read())
2801 msg = b'BLOCKED'
2802 wf.write(msg)
2803 sent.append(msg)
2804
2805 while True:
2806 try:
2807 wf.flush()
2808 break
2809 except self.BlockingIOError as e:
2810 self.assertEqual(e.args[0], errno.EAGAIN)
2811 self.assertEqual(e.characters_written, 0)
2812 received.append(rf.read())
2813
2814 received += iter(rf.read, None)
2815
2816 sent, received = b''.join(sent), b''.join(received)
2817 self.assertTrue(sent == received)
2818 self.assertTrue(wf.closed)
2819 self.assertTrue(rf.closed)
2820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821class CMiscIOTest(MiscIOTest):
2822 io = io
2823
2824class PyMiscIOTest(MiscIOTest):
2825 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002826
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002827
2828@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2829class SignalsTest(unittest.TestCase):
2830
2831 def setUp(self):
2832 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2833
2834 def tearDown(self):
2835 signal.signal(signal.SIGALRM, self.oldalrm)
2836
2837 def alarm_interrupt(self, sig, frame):
2838 1/0
2839
2840 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002841 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2842 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002843 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2844 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002845 invokes the signal handler, and bubbles up the exception raised
2846 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002847 read_results = []
2848 def _read():
2849 s = os.read(r, 1)
2850 read_results.append(s)
2851 t = threading.Thread(target=_read)
2852 t.daemon = True
2853 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002854 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002855 try:
2856 wio = self.io.open(w, **fdopen_kwargs)
2857 t.start()
2858 signal.alarm(1)
2859 # Fill the pipe enough that the write will be blocking.
2860 # It will be interrupted by the timer armed above. Since the
2861 # other thread has read one byte, the low-level write will
2862 # return with a successful (partial) result rather than an EINTR.
2863 # The buffered IO layer must check for pending signal
2864 # handlers, which in this case will invoke alarm_interrupt().
2865 self.assertRaises(ZeroDivisionError,
2866 wio.write, item * (1024 * 1024))
2867 t.join()
2868 # We got one byte, get another one and check that it isn't a
2869 # repeat of the first one.
2870 read_results.append(os.read(r, 1))
2871 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2872 finally:
2873 os.close(w)
2874 os.close(r)
2875 # This is deliberate. If we didn't close the file descriptor
2876 # before closing wio, wio would try to flush its internal
2877 # buffer, and block again.
2878 try:
2879 wio.close()
2880 except IOError as e:
2881 if e.errno != errno.EBADF:
2882 raise
2883
2884 def test_interrupted_write_unbuffered(self):
2885 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2886
2887 def test_interrupted_write_buffered(self):
2888 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2889
2890 def test_interrupted_write_text(self):
2891 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2892
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002893 def check_reentrant_write(self, data, **fdopen_kwargs):
2894 def on_alarm(*args):
2895 # Will be called reentrantly from the same thread
2896 wio.write(data)
2897 1/0
2898 signal.signal(signal.SIGALRM, on_alarm)
2899 r, w = os.pipe()
2900 wio = self.io.open(w, **fdopen_kwargs)
2901 try:
2902 signal.alarm(1)
2903 # Either the reentrant call to wio.write() fails with RuntimeError,
2904 # or the signal handler raises ZeroDivisionError.
2905 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2906 while 1:
2907 for i in range(100):
2908 wio.write(data)
2909 wio.flush()
2910 # Make sure the buffer doesn't fill up and block further writes
2911 os.read(r, len(data) * 100)
2912 exc = cm.exception
2913 if isinstance(exc, RuntimeError):
2914 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2915 finally:
2916 wio.close()
2917 os.close(r)
2918
2919 def test_reentrant_write_buffered(self):
2920 self.check_reentrant_write(b"xy", mode="wb")
2921
2922 def test_reentrant_write_text(self):
2923 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2924
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002925 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2926 """Check that a buffered read, when it gets interrupted (either
2927 returning a partial result or EINTR), properly invokes the signal
2928 handler and retries if the latter returned successfully."""
2929 r, w = os.pipe()
2930 fdopen_kwargs["closefd"] = False
2931 def alarm_handler(sig, frame):
2932 os.write(w, b"bar")
2933 signal.signal(signal.SIGALRM, alarm_handler)
2934 try:
2935 rio = self.io.open(r, **fdopen_kwargs)
2936 os.write(w, b"foo")
2937 signal.alarm(1)
2938 # Expected behaviour:
2939 # - first raw read() returns partial b"foo"
2940 # - second raw read() returns EINTR
2941 # - third raw read() returns b"bar"
2942 self.assertEqual(decode(rio.read(6)), "foobar")
2943 finally:
2944 rio.close()
2945 os.close(w)
2946 os.close(r)
2947
Antoine Pitrou20db5112011-08-19 20:32:34 +02002948 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002949 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2950 mode="rb")
2951
Antoine Pitrou20db5112011-08-19 20:32:34 +02002952 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002953 self.check_interrupted_read_retry(lambda x: x,
2954 mode="r")
2955
2956 @unittest.skipUnless(threading, 'Threading required for this test.')
2957 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2958 """Check that a buffered write, when it gets interrupted (either
2959 returning a partial result or EINTR), properly invokes the signal
2960 handler and retries if the latter returned successfully."""
2961 select = support.import_module("select")
2962 # A quantity that exceeds the buffer size of an anonymous pipe's
2963 # write end.
2964 N = 1024 * 1024
2965 r, w = os.pipe()
2966 fdopen_kwargs["closefd"] = False
2967 # We need a separate thread to read from the pipe and allow the
2968 # write() to finish. This thread is started after the SIGALRM is
2969 # received (forcing a first EINTR in write()).
2970 read_results = []
2971 write_finished = False
2972 def _read():
2973 while not write_finished:
2974 while r in select.select([r], [], [], 1.0)[0]:
2975 s = os.read(r, 1024)
2976 read_results.append(s)
2977 t = threading.Thread(target=_read)
2978 t.daemon = True
2979 def alarm1(sig, frame):
2980 signal.signal(signal.SIGALRM, alarm2)
2981 signal.alarm(1)
2982 def alarm2(sig, frame):
2983 t.start()
2984 signal.signal(signal.SIGALRM, alarm1)
2985 try:
2986 wio = self.io.open(w, **fdopen_kwargs)
2987 signal.alarm(1)
2988 # Expected behaviour:
2989 # - first raw write() is partial (because of the limited pipe buffer
2990 # and the first alarm)
2991 # - second raw write() returns EINTR (because of the second alarm)
2992 # - subsequent write()s are successful (either partial or complete)
2993 self.assertEqual(N, wio.write(item * N))
2994 wio.flush()
2995 write_finished = True
2996 t.join()
2997 self.assertEqual(N, sum(len(x) for x in read_results))
2998 finally:
2999 write_finished = True
3000 os.close(w)
3001 os.close(r)
3002 # This is deliberate. If we didn't close the file descriptor
3003 # before closing wio, wio would try to flush its internal
3004 # buffer, and could block (in case of failure).
3005 try:
3006 wio.close()
3007 except IOError as e:
3008 if e.errno != errno.EBADF:
3009 raise
3010
Antoine Pitrou20db5112011-08-19 20:32:34 +02003011 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003012 self.check_interrupted_write_retry(b"x", mode="wb")
3013
Antoine Pitrou20db5112011-08-19 20:32:34 +02003014 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003015 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3016
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003017
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003018class CSignalsTest(SignalsTest):
3019 io = io
3020
3021class PySignalsTest(SignalsTest):
3022 io = pyio
3023
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003024 # Handling reentrancy issues would slow down _pyio even more, so the
3025 # tests are disabled.
3026 test_reentrant_write_buffered = None
3027 test_reentrant_write_text = None
3028
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003029
Guido van Rossum28524c72007-02-27 05:47:44 +00003030def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031 tests = (CIOTest, PyIOTest,
3032 CBufferedReaderTest, PyBufferedReaderTest,
3033 CBufferedWriterTest, PyBufferedWriterTest,
3034 CBufferedRWPairTest, PyBufferedRWPairTest,
3035 CBufferedRandomTest, PyBufferedRandomTest,
3036 StatefulIncrementalDecoderTest,
3037 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3038 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003039 CMiscIOTest, PyMiscIOTest,
3040 CSignalsTest, PySignalsTest,
3041 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042
3043 # Put the namespaces of the IO module we are testing and some useful mock
3044 # classes in the __dict__ of each test.
3045 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003046 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003047 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3048 c_io_ns = {name : getattr(io, name) for name in all_members}
3049 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3050 globs = globals()
3051 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3052 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3053 # Avoid turning open into a bound method.
3054 py_io_ns["open"] = pyio.OpenWrapper
3055 for test in tests:
3056 if test.__name__.startswith("C"):
3057 for name, obj in c_io_ns.items():
3058 setattr(test, name, obj)
3059 elif test.__name__.startswith("Py"):
3060 for name, obj in py_io_ns.items():
3061 setattr(test, name, obj)
3062
3063 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003064
3065if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003066 test_main()