blob: 95ecd90a8ecf8fcceb510bd767e4d1d1b656cc49 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200637 def test_fileio_closefd(self):
638 # Issue #4841
639 with self.open(__file__, 'rb') as f1, \
640 self.open(__file__, 'rb') as f2:
641 fileio = self.FileIO(f1.fileno(), closefd=False)
642 # .__init__() must not close f1
643 fileio.__init__(f2.fileno(), closefd=False)
644 f1.readline()
645 # .close() must not close f2
646 fileio.close()
647 f2.readline()
648
649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200651
652 def test_IOBase_finalize(self):
653 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
654 # class which inherits IOBase and an object of this class are caught
655 # in a reference cycle and close() is already in the method cache.
656 class MyIO(self.IOBase):
657 def close(self):
658 pass
659
660 # create an instance to populate the method cache
661 MyIO()
662 obj = MyIO()
663 obj.obj = obj
664 wr = weakref.ref(obj)
665 del MyIO
666 del obj
667 support.gc_collect()
668 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670class PyIOTest(IOTest):
671 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000672
Guido van Rossuma9e20242007-03-08 00:43:48 +0000673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674class CommonBufferedTests:
675 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
676
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000677 def test_detach(self):
678 raw = self.MockRawIO()
679 buf = self.tp(raw)
680 self.assertIs(buf.detach(), raw)
681 self.assertRaises(ValueError, buf.detach)
682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 def test_fileno(self):
684 rawio = self.MockRawIO()
685 bufio = self.tp(rawio)
686
Ezio Melottib3aedd42010-11-20 19:04:17 +0000687 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000688
689 def test_no_fileno(self):
690 # XXX will we always have fileno() function? If so, kill
691 # this test. Else, write it.
692 pass
693
694 def test_invalid_args(self):
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 # Invalid whence
698 self.assertRaises(ValueError, bufio.seek, 0, -1)
699 self.assertRaises(ValueError, bufio.seek, 0, 3)
700
701 def test_override_destructor(self):
702 tp = self.tp
703 record = []
704 class MyBufferedIO(tp):
705 def __del__(self):
706 record.append(1)
707 try:
708 f = super().__del__
709 except AttributeError:
710 pass
711 else:
712 f()
713 def close(self):
714 record.append(2)
715 super().close()
716 def flush(self):
717 record.append(3)
718 super().flush()
719 rawio = self.MockRawIO()
720 bufio = MyBufferedIO(rawio)
721 writable = bufio.writable()
722 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000723 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 if writable:
725 self.assertEqual(record, [1, 2, 3])
726 else:
727 self.assertEqual(record, [1, 2])
728
729 def test_context_manager(self):
730 # Test usability as a context manager
731 rawio = self.MockRawIO()
732 bufio = self.tp(rawio)
733 def _with():
734 with bufio:
735 pass
736 _with()
737 # bufio should now be closed, and using it a second time should raise
738 # a ValueError.
739 self.assertRaises(ValueError, _with)
740
741 def test_error_through_destructor(self):
742 # Test that the exception state is not modified by a destructor,
743 # even if close() fails.
744 rawio = self.CloseFailureIO()
745 def f():
746 self.tp(rawio).xyzzy
747 with support.captured_output("stderr") as s:
748 self.assertRaises(AttributeError, f)
749 s = s.getvalue().strip()
750 if s:
751 # The destructor *may* have printed an unraisable error, check it
752 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000753 self.assertTrue(s.startswith("Exception IOError: "), s)
754 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000755
Antoine Pitrou716c4442009-05-23 19:04:03 +0000756 def test_repr(self):
757 raw = self.MockRawIO()
758 b = self.tp(raw)
759 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
760 self.assertEqual(repr(b), "<%s>" % clsname)
761 raw.name = "dummy"
762 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
763 raw.name = b"dummy"
764 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
765
Antoine Pitrou6be88762010-05-03 16:48:20 +0000766 def test_flush_error_on_close(self):
767 raw = self.MockRawIO()
768 def bad_flush():
769 raise IOError()
770 raw.flush = bad_flush
771 b = self.tp(raw)
772 self.assertRaises(IOError, b.close) # exception not swallowed
773
774 def test_multi_close(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
777 b.close()
778 b.close()
779 b.close()
780 self.assertRaises(ValueError, b.flush)
781
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000782 def test_unseekable(self):
783 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
784 self.assertRaises(self.UnsupportedOperation, bufio.tell)
785 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
786
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000787 def test_readonly_attributes(self):
788 raw = self.MockRawIO()
789 buf = self.tp(raw)
790 x = self.MockRawIO()
791 with self.assertRaises(AttributeError):
792 buf.raw = x
793
Guido van Rossum78892e42007-04-06 17:31:18 +0000794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
796 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000797
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000798 def test_constructor(self):
799 rawio = self.MockRawIO([b"abc"])
800 bufio = self.tp(rawio)
801 bufio.__init__(rawio)
802 bufio.__init__(rawio, buffer_size=1024)
803 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000805 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
807 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
808 rawio = self.MockRawIO([b"abc"])
809 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000810 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000813 for arg in (None, 7):
814 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
815 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000816 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 # Invalid args
818 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000819
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 def test_read1(self):
821 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
822 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000823 self.assertEqual(b"a", bufio.read(1))
824 self.assertEqual(b"b", bufio.read1(1))
825 self.assertEqual(rawio._reads, 1)
826 self.assertEqual(b"c", bufio.read1(100))
827 self.assertEqual(rawio._reads, 1)
828 self.assertEqual(b"d", bufio.read1(100))
829 self.assertEqual(rawio._reads, 2)
830 self.assertEqual(b"efg", bufio.read1(100))
831 self.assertEqual(rawio._reads, 3)
832 self.assertEqual(b"", bufio.read1(100))
833 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 # Invalid args
835 self.assertRaises(ValueError, bufio.read1, -1)
836
837 def test_readinto(self):
838 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
839 bufio = self.tp(rawio)
840 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000841 self.assertEqual(bufio.readinto(b), 2)
842 self.assertEqual(b, b"ab")
843 self.assertEqual(bufio.readinto(b), 2)
844 self.assertEqual(b, b"cd")
845 self.assertEqual(bufio.readinto(b), 2)
846 self.assertEqual(b, b"ef")
847 self.assertEqual(bufio.readinto(b), 1)
848 self.assertEqual(b, b"gf")
849 self.assertEqual(bufio.readinto(b), 0)
850 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000852 def test_readlines(self):
853 def bufio():
854 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
855 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000856 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
857 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
858 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000861 data = b"abcdefghi"
862 dlen = len(data)
863
864 tests = [
865 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
866 [ 100, [ 3, 3, 3], [ dlen ] ],
867 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
868 ]
869
870 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000871 rawio = self.MockFileIO(data)
872 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000873 pos = 0
874 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000875 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000876 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000877 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000878 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000879
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000880 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000881 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
883 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000884 self.assertEqual(b"abcd", bufio.read(6))
885 self.assertEqual(b"e", bufio.read(1))
886 self.assertEqual(b"fg", bufio.read())
887 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200888 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000889 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000890
Victor Stinnera80987f2011-05-25 22:47:16 +0200891 rawio = self.MockRawIO((b"a", None, None))
892 self.assertEqual(b"a", rawio.readall())
893 self.assertIsNone(rawio.readall())
894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_read_past_eof(self):
896 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
897 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000898
Ezio Melottib3aedd42010-11-20 19:04:17 +0000899 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 def test_read_all(self):
902 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
903 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000904
Ezio Melottib3aedd42010-11-20 19:04:17 +0000905 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000906
Victor Stinner45df8202010-04-28 22:31:17 +0000907 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000908 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000910 try:
911 # Write out many bytes with exactly the same number of 0's,
912 # 1's... 255's. This will help us check that concurrent reading
913 # doesn't duplicate or forget contents.
914 N = 1000
915 l = list(range(256)) * N
916 random.shuffle(l)
917 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000918 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000919 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000920 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000922 errors = []
923 results = []
924 def f():
925 try:
926 # Intra-buffer read then buffer-flushing read
927 for n in cycle([1, 19]):
928 s = bufio.read(n)
929 if not s:
930 break
931 # list.append() is atomic
932 results.append(s)
933 except Exception as e:
934 errors.append(e)
935 raise
936 threads = [threading.Thread(target=f) for x in range(20)]
937 for t in threads:
938 t.start()
939 time.sleep(0.02) # yield
940 for t in threads:
941 t.join()
942 self.assertFalse(errors,
943 "the following exceptions were caught: %r" % errors)
944 s = b''.join(results)
945 for i in range(256):
946 c = bytes(bytearray([i]))
947 self.assertEqual(s.count(c), N)
948 finally:
949 support.unlink(support.TESTFN)
950
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200951 def test_unseekable(self):
952 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
953 self.assertRaises(self.UnsupportedOperation, bufio.tell)
954 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
955 bufio.read(1)
956 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
957 self.assertRaises(self.UnsupportedOperation, bufio.tell)
958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_misbehaved_io(self):
960 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
961 bufio = self.tp(rawio)
962 self.assertRaises(IOError, bufio.seek, 0)
963 self.assertRaises(IOError, bufio.tell)
964
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000965 def test_no_extraneous_read(self):
966 # Issue #9550; when the raw IO object has satisfied the read request,
967 # we should not issue any additional reads, otherwise it may block
968 # (e.g. socket).
969 bufsize = 16
970 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
971 rawio = self.MockRawIO([b"x" * n])
972 bufio = self.tp(rawio, bufsize)
973 self.assertEqual(bufio.read(n), b"x" * n)
974 # Simple case: one raw read is enough to satisfy the request.
975 self.assertEqual(rawio._extraneous_reads, 0,
976 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
977 # A more complex case where two raw reads are needed to satisfy
978 # the request.
979 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
980 bufio = self.tp(rawio, bufsize)
981 self.assertEqual(bufio.read(n), b"x" * n)
982 self.assertEqual(rawio._extraneous_reads, 0,
983 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
984
985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986class CBufferedReaderTest(BufferedReaderTest):
987 tp = io.BufferedReader
988
989 def test_constructor(self):
990 BufferedReaderTest.test_constructor(self)
991 # The allocation can succeed on 32-bit builds, e.g. with more
992 # than 2GB RAM and a 64-bit kernel.
993 if sys.maxsize > 0x7FFFFFFF:
994 rawio = self.MockRawIO()
995 bufio = self.tp(rawio)
996 self.assertRaises((OverflowError, MemoryError, ValueError),
997 bufio.__init__, rawio, sys.maxsize)
998
999 def test_initialization(self):
1000 rawio = self.MockRawIO([b"abc"])
1001 bufio = self.tp(rawio)
1002 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1003 self.assertRaises(ValueError, bufio.read)
1004 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1005 self.assertRaises(ValueError, bufio.read)
1006 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1007 self.assertRaises(ValueError, bufio.read)
1008
1009 def test_misbehaved_io_read(self):
1010 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1011 bufio = self.tp(rawio)
1012 # _pyio.BufferedReader seems to implement reading different, so that
1013 # checking this is not so easy.
1014 self.assertRaises(IOError, bufio.read, 10)
1015
1016 def test_garbage_collection(self):
1017 # C BufferedReader objects are collected.
1018 # The Python version has __del__, so it ends into gc.garbage instead
1019 rawio = self.FileIO(support.TESTFN, "w+b")
1020 f = self.tp(rawio)
1021 f.f = f
1022 wr = weakref.ref(f)
1023 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001024 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001025 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026
1027class PyBufferedReaderTest(BufferedReaderTest):
1028 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001029
Guido van Rossuma9e20242007-03-08 00:43:48 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1032 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 def test_constructor(self):
1035 rawio = self.MockRawIO()
1036 bufio = self.tp(rawio)
1037 bufio.__init__(rawio)
1038 bufio.__init__(rawio, buffer_size=1024)
1039 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001040 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 bufio.flush()
1042 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1043 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1045 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001046 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001048 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001050 def test_detach_flush(self):
1051 raw = self.MockRawIO()
1052 buf = self.tp(raw)
1053 buf.write(b"howdy!")
1054 self.assertFalse(raw._write_stack)
1055 buf.detach()
1056 self.assertEqual(raw._write_stack, [b"howdy!"])
1057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 writer = self.MockRawIO()
1061 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001062 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001063 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_write_overflow(self):
1066 writer = self.MockRawIO()
1067 bufio = self.tp(writer, 8)
1068 contents = b"abcdefghijklmnop"
1069 for n in range(0, len(contents), 3):
1070 bufio.write(contents[n:n+3])
1071 flushed = b"".join(writer._write_stack)
1072 # At least (total - 8) bytes were implicitly flushed, perhaps more
1073 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001074 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 def check_writes(self, intermediate_func):
1077 # Lots of writes, test the flushed output is as expected.
1078 contents = bytes(range(256)) * 1000
1079 n = 0
1080 writer = self.MockRawIO()
1081 bufio = self.tp(writer, 13)
1082 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1083 def gen_sizes():
1084 for size in count(1):
1085 for i in range(15):
1086 yield size
1087 sizes = gen_sizes()
1088 while n < len(contents):
1089 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001090 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 intermediate_func(bufio)
1092 n += size
1093 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001094 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_writes(self):
1097 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 def test_writes_and_flushes(self):
1100 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 def test_writes_and_seeks(self):
1103 def _seekabs(bufio):
1104 pos = bufio.tell()
1105 bufio.seek(pos + 1, 0)
1106 bufio.seek(pos - 1, 0)
1107 bufio.seek(pos, 0)
1108 self.check_writes(_seekabs)
1109 def _seekrel(bufio):
1110 pos = bufio.seek(0, 1)
1111 bufio.seek(+1, 1)
1112 bufio.seek(-1, 1)
1113 bufio.seek(pos, 0)
1114 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 def test_writes_and_truncates(self):
1117 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001118
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119 def test_write_non_blocking(self):
1120 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001121 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001122
Ezio Melottib3aedd42010-11-20 19:04:17 +00001123 self.assertEqual(bufio.write(b"abcd"), 4)
1124 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 # 1 byte will be written, the rest will be buffered
1126 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001127 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1130 raw.block_on(b"0")
1131 try:
1132 bufio.write(b"opqrwxyz0123456789")
1133 except self.BlockingIOError as e:
1134 written = e.characters_written
1135 else:
1136 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001137 self.assertEqual(written, 16)
1138 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001140
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 s = raw.pop_written()
1143 # Previously buffered bytes were flushed
1144 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_write_and_rewind(self):
1147 raw = io.BytesIO()
1148 bufio = self.tp(raw, 4)
1149 self.assertEqual(bufio.write(b"abcdef"), 6)
1150 self.assertEqual(bufio.tell(), 6)
1151 bufio.seek(0, 0)
1152 self.assertEqual(bufio.write(b"XY"), 2)
1153 bufio.seek(6, 0)
1154 self.assertEqual(raw.getvalue(), b"XYcdef")
1155 self.assertEqual(bufio.write(b"123456"), 6)
1156 bufio.flush()
1157 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_flush(self):
1160 writer = self.MockRawIO()
1161 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001162 bufio.write(b"abc")
1163 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001164 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166 def test_destructor(self):
1167 writer = self.MockRawIO()
1168 bufio = self.tp(writer, 8)
1169 bufio.write(b"abc")
1170 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001171 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001172 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173
1174 def test_truncate(self):
1175 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001176 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001177 bufio = self.tp(raw, 8)
1178 bufio.write(b"abcdef")
1179 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001180 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001181 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 self.assertEqual(f.read(), b"abc")
1183
Victor Stinner45df8202010-04-28 22:31:17 +00001184 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001185 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001187 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 # Write out many bytes from many threads and test they were
1189 # all flushed.
1190 N = 1000
1191 contents = bytes(range(256)) * N
1192 sizes = cycle([1, 19])
1193 n = 0
1194 queue = deque()
1195 while n < len(contents):
1196 size = next(sizes)
1197 queue.append(contents[n:n+size])
1198 n += size
1199 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001200 # We use a real file object because it allows us to
1201 # exercise situations where the GIL is released before
1202 # writing the buffer to the raw streams. This is in addition
1203 # to concurrency issues due to switching threads in the middle
1204 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001205 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001207 errors = []
1208 def f():
1209 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 while True:
1211 try:
1212 s = queue.popleft()
1213 except IndexError:
1214 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001215 bufio.write(s)
1216 except Exception as e:
1217 errors.append(e)
1218 raise
1219 threads = [threading.Thread(target=f) for x in range(20)]
1220 for t in threads:
1221 t.start()
1222 time.sleep(0.02) # yield
1223 for t in threads:
1224 t.join()
1225 self.assertFalse(errors,
1226 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001228 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 s = f.read()
1230 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001231 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001232 finally:
1233 support.unlink(support.TESTFN)
1234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 def test_misbehaved_io(self):
1236 rawio = self.MisbehavedRawIO()
1237 bufio = self.tp(rawio, 5)
1238 self.assertRaises(IOError, bufio.seek, 0)
1239 self.assertRaises(IOError, bufio.tell)
1240 self.assertRaises(IOError, bufio.write, b"abcdef")
1241
Benjamin Peterson59406a92009-03-26 17:10:29 +00001242 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001243 with support.check_warnings(("max_buffer_size is deprecated",
1244 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001245 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001246
1247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248class CBufferedWriterTest(BufferedWriterTest):
1249 tp = io.BufferedWriter
1250
1251 def test_constructor(self):
1252 BufferedWriterTest.test_constructor(self)
1253 # The allocation can succeed on 32-bit builds, e.g. with more
1254 # than 2GB RAM and a 64-bit kernel.
1255 if sys.maxsize > 0x7FFFFFFF:
1256 rawio = self.MockRawIO()
1257 bufio = self.tp(rawio)
1258 self.assertRaises((OverflowError, MemoryError, ValueError),
1259 bufio.__init__, rawio, sys.maxsize)
1260
1261 def test_initialization(self):
1262 rawio = self.MockRawIO()
1263 bufio = self.tp(rawio)
1264 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1265 self.assertRaises(ValueError, bufio.write, b"def")
1266 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1267 self.assertRaises(ValueError, bufio.write, b"def")
1268 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1269 self.assertRaises(ValueError, bufio.write, b"def")
1270
1271 def test_garbage_collection(self):
1272 # C BufferedWriter objects are collected, and collecting them flushes
1273 # all data to disk.
1274 # The Python version has __del__, so it ends into gc.garbage instead
1275 rawio = self.FileIO(support.TESTFN, "w+b")
1276 f = self.tp(rawio)
1277 f.write(b"123xxx")
1278 f.x = f
1279 wr = weakref.ref(f)
1280 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001281 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001282 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001283 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 self.assertEqual(f.read(), b"123xxx")
1285
1286
1287class PyBufferedWriterTest(BufferedWriterTest):
1288 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001289
Guido van Rossum01a27522007-03-07 01:00:12 +00001290class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001291
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001292 def test_constructor(self):
1293 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001294 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001295
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001296 def test_detach(self):
1297 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1298 self.assertRaises(self.UnsupportedOperation, pair.detach)
1299
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001300 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001301 with support.check_warnings(("max_buffer_size is deprecated",
1302 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001303 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001304
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001305 def test_constructor_with_not_readable(self):
1306 class NotReadable(MockRawIO):
1307 def readable(self):
1308 return False
1309
1310 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1311
1312 def test_constructor_with_not_writeable(self):
1313 class NotWriteable(MockRawIO):
1314 def writable(self):
1315 return False
1316
1317 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1318
1319 def test_read(self):
1320 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1321
1322 self.assertEqual(pair.read(3), b"abc")
1323 self.assertEqual(pair.read(1), b"d")
1324 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001325 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1326 self.assertEqual(pair.read(None), b"abc")
1327
1328 def test_readlines(self):
1329 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1330 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1331 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1332 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001333
1334 def test_read1(self):
1335 # .read1() is delegated to the underlying reader object, so this test
1336 # can be shallow.
1337 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1338
1339 self.assertEqual(pair.read1(3), b"abc")
1340
1341 def test_readinto(self):
1342 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1343
1344 data = bytearray(5)
1345 self.assertEqual(pair.readinto(data), 5)
1346 self.assertEqual(data, b"abcde")
1347
1348 def test_write(self):
1349 w = self.MockRawIO()
1350 pair = self.tp(self.MockRawIO(), w)
1351
1352 pair.write(b"abc")
1353 pair.flush()
1354 pair.write(b"def")
1355 pair.flush()
1356 self.assertEqual(w._write_stack, [b"abc", b"def"])
1357
1358 def test_peek(self):
1359 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1360
1361 self.assertTrue(pair.peek(3).startswith(b"abc"))
1362 self.assertEqual(pair.read(3), b"abc")
1363
1364 def test_readable(self):
1365 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1366 self.assertTrue(pair.readable())
1367
1368 def test_writeable(self):
1369 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1370 self.assertTrue(pair.writable())
1371
1372 def test_seekable(self):
1373 # BufferedRWPairs are never seekable, even if their readers and writers
1374 # are.
1375 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1376 self.assertFalse(pair.seekable())
1377
1378 # .flush() is delegated to the underlying writer object and has been
1379 # tested in the test_write method.
1380
1381 def test_close_and_closed(self):
1382 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1383 self.assertFalse(pair.closed)
1384 pair.close()
1385 self.assertTrue(pair.closed)
1386
1387 def test_isatty(self):
1388 class SelectableIsAtty(MockRawIO):
1389 def __init__(self, isatty):
1390 MockRawIO.__init__(self)
1391 self._isatty = isatty
1392
1393 def isatty(self):
1394 return self._isatty
1395
1396 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1397 self.assertFalse(pair.isatty())
1398
1399 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1400 self.assertTrue(pair.isatty())
1401
1402 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1403 self.assertTrue(pair.isatty())
1404
1405 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1406 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001407
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408class CBufferedRWPairTest(BufferedRWPairTest):
1409 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001410
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411class PyBufferedRWPairTest(BufferedRWPairTest):
1412 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414
1415class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1416 read_mode = "rb+"
1417 write_mode = "wb+"
1418
1419 def test_constructor(self):
1420 BufferedReaderTest.test_constructor(self)
1421 BufferedWriterTest.test_constructor(self)
1422
1423 def test_read_and_write(self):
1424 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001425 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001426
1427 self.assertEqual(b"as", rw.read(2))
1428 rw.write(b"ddd")
1429 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001430 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 def test_seek_and_tell(self):
1435 raw = self.BytesIO(b"asdfghjkl")
1436 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001437
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(b"as", rw.read(2))
1439 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001440 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001441 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001442
Antoine Pitroue05565e2011-08-20 14:39:23 +02001443 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001444 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001445 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001447 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001449 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001450 self.assertEqual(7, rw.tell())
1451 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001452 rw.flush()
1453 self.assertEqual(b"asdf123fl", raw.getvalue())
1454
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001455 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457 def check_flush_and_read(self, read_func):
1458 raw = self.BytesIO(b"abcdefghi")
1459 bufio = self.tp(raw)
1460
Ezio Melottib3aedd42010-11-20 19:04:17 +00001461 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001463 self.assertEqual(b"ef", read_func(bufio, 2))
1464 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001466 self.assertEqual(6, bufio.tell())
1467 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468 raw.seek(0, 0)
1469 raw.write(b"XYZ")
1470 # flush() resets the read buffer
1471 bufio.flush()
1472 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001473 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001474
1475 def test_flush_and_read(self):
1476 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1477
1478 def test_flush_and_readinto(self):
1479 def _readinto(bufio, n=-1):
1480 b = bytearray(n if n >= 0 else 9999)
1481 n = bufio.readinto(b)
1482 return bytes(b[:n])
1483 self.check_flush_and_read(_readinto)
1484
1485 def test_flush_and_peek(self):
1486 def _peek(bufio, n=-1):
1487 # This relies on the fact that the buffer can contain the whole
1488 # raw stream, otherwise peek() can return less.
1489 b = bufio.peek(n)
1490 if n != -1:
1491 b = b[:n]
1492 bufio.seek(len(b), 1)
1493 return b
1494 self.check_flush_and_read(_peek)
1495
1496 def test_flush_and_write(self):
1497 raw = self.BytesIO(b"abcdefghi")
1498 bufio = self.tp(raw)
1499
1500 bufio.write(b"123")
1501 bufio.flush()
1502 bufio.write(b"45")
1503 bufio.flush()
1504 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001505 self.assertEqual(b"12345fghi", raw.getvalue())
1506 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001507
1508 def test_threads(self):
1509 BufferedReaderTest.test_threads(self)
1510 BufferedWriterTest.test_threads(self)
1511
1512 def test_writes_and_peek(self):
1513 def _peek(bufio):
1514 bufio.peek(1)
1515 self.check_writes(_peek)
1516 def _peek(bufio):
1517 pos = bufio.tell()
1518 bufio.seek(-1, 1)
1519 bufio.peek(1)
1520 bufio.seek(pos, 0)
1521 self.check_writes(_peek)
1522
1523 def test_writes_and_reads(self):
1524 def _read(bufio):
1525 bufio.seek(-1, 1)
1526 bufio.read(1)
1527 self.check_writes(_read)
1528
1529 def test_writes_and_read1s(self):
1530 def _read1(bufio):
1531 bufio.seek(-1, 1)
1532 bufio.read1(1)
1533 self.check_writes(_read1)
1534
1535 def test_writes_and_readintos(self):
1536 def _read(bufio):
1537 bufio.seek(-1, 1)
1538 bufio.readinto(bytearray(1))
1539 self.check_writes(_read)
1540
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001541 def test_write_after_readahead(self):
1542 # Issue #6629: writing after the buffer was filled by readahead should
1543 # first rewind the raw stream.
1544 for overwrite_size in [1, 5]:
1545 raw = self.BytesIO(b"A" * 10)
1546 bufio = self.tp(raw, 4)
1547 # Trigger readahead
1548 self.assertEqual(bufio.read(1), b"A")
1549 self.assertEqual(bufio.tell(), 1)
1550 # Overwriting should rewind the raw stream if it needs so
1551 bufio.write(b"B" * overwrite_size)
1552 self.assertEqual(bufio.tell(), overwrite_size + 1)
1553 # If the write size was smaller than the buffer size, flush() and
1554 # check that rewind happens.
1555 bufio.flush()
1556 self.assertEqual(bufio.tell(), overwrite_size + 1)
1557 s = raw.getvalue()
1558 self.assertEqual(s,
1559 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1560
Antoine Pitrou7c404892011-05-13 00:13:33 +02001561 def test_write_rewind_write(self):
1562 # Various combinations of reading / writing / seeking backwards / writing again
1563 def mutate(bufio, pos1, pos2):
1564 assert pos2 >= pos1
1565 # Fill the buffer
1566 bufio.seek(pos1)
1567 bufio.read(pos2 - pos1)
1568 bufio.write(b'\x02')
1569 # This writes earlier than the previous write, but still inside
1570 # the buffer.
1571 bufio.seek(pos1)
1572 bufio.write(b'\x01')
1573
1574 b = b"\x80\x81\x82\x83\x84"
1575 for i in range(0, len(b)):
1576 for j in range(i, len(b)):
1577 raw = self.BytesIO(b)
1578 bufio = self.tp(raw, 100)
1579 mutate(bufio, i, j)
1580 bufio.flush()
1581 expected = bytearray(b)
1582 expected[j] = 2
1583 expected[i] = 1
1584 self.assertEqual(raw.getvalue(), expected,
1585 "failed result for i=%d, j=%d" % (i, j))
1586
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001587 def test_truncate_after_read_or_write(self):
1588 raw = self.BytesIO(b"A" * 10)
1589 bufio = self.tp(raw, 100)
1590 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1591 self.assertEqual(bufio.truncate(), 2)
1592 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1593 self.assertEqual(bufio.truncate(), 4)
1594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def test_misbehaved_io(self):
1596 BufferedReaderTest.test_misbehaved_io(self)
1597 BufferedWriterTest.test_misbehaved_io(self)
1598
Antoine Pitroue05565e2011-08-20 14:39:23 +02001599 def test_interleaved_read_write(self):
1600 # Test for issue #12213
1601 with self.BytesIO(b'abcdefgh') as raw:
1602 with self.tp(raw, 100) as f:
1603 f.write(b"1")
1604 self.assertEqual(f.read(1), b'b')
1605 f.write(b'2')
1606 self.assertEqual(f.read1(1), b'd')
1607 f.write(b'3')
1608 buf = bytearray(1)
1609 f.readinto(buf)
1610 self.assertEqual(buf, b'f')
1611 f.write(b'4')
1612 self.assertEqual(f.peek(1), b'h')
1613 f.flush()
1614 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1615
1616 with self.BytesIO(b'abc') as raw:
1617 with self.tp(raw, 100) as f:
1618 self.assertEqual(f.read(1), b'a')
1619 f.write(b"2")
1620 self.assertEqual(f.read(1), b'c')
1621 f.flush()
1622 self.assertEqual(raw.getvalue(), b'a2c')
1623
1624 def test_interleaved_readline_write(self):
1625 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1626 with self.tp(raw) as f:
1627 f.write(b'1')
1628 self.assertEqual(f.readline(), b'b\n')
1629 f.write(b'2')
1630 self.assertEqual(f.readline(), b'def\n')
1631 f.write(b'3')
1632 self.assertEqual(f.readline(), b'\n')
1633 f.flush()
1634 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1635
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001636 # You can't construct a BufferedRandom over a non-seekable stream.
1637 test_unseekable = None
1638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639class CBufferedRandomTest(BufferedRandomTest):
1640 tp = io.BufferedRandom
1641
1642 def test_constructor(self):
1643 BufferedRandomTest.test_constructor(self)
1644 # The allocation can succeed on 32-bit builds, e.g. with more
1645 # than 2GB RAM and a 64-bit kernel.
1646 if sys.maxsize > 0x7FFFFFFF:
1647 rawio = self.MockRawIO()
1648 bufio = self.tp(rawio)
1649 self.assertRaises((OverflowError, MemoryError, ValueError),
1650 bufio.__init__, rawio, sys.maxsize)
1651
1652 def test_garbage_collection(self):
1653 CBufferedReaderTest.test_garbage_collection(self)
1654 CBufferedWriterTest.test_garbage_collection(self)
1655
1656class PyBufferedRandomTest(BufferedRandomTest):
1657 tp = pyio.BufferedRandom
1658
1659
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001660# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1661# properties:
1662# - A single output character can correspond to many bytes of input.
1663# - The number of input bytes to complete the character can be
1664# undetermined until the last input byte is received.
1665# - The number of input bytes can vary depending on previous input.
1666# - A single input byte can correspond to many characters of output.
1667# - The number of output characters can be undetermined until the
1668# last input byte is received.
1669# - The number of output characters can vary depending on previous input.
1670
1671class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1672 """
1673 For testing seek/tell behavior with a stateful, buffering decoder.
1674
1675 Input is a sequence of words. Words may be fixed-length (length set
1676 by input) or variable-length (period-terminated). In variable-length
1677 mode, extra periods are ignored. Possible words are:
1678 - 'i' followed by a number sets the input length, I (maximum 99).
1679 When I is set to 0, words are space-terminated.
1680 - 'o' followed by a number sets the output length, O (maximum 99).
1681 - Any other word is converted into a word followed by a period on
1682 the output. The output word consists of the input word truncated
1683 or padded out with hyphens to make its length equal to O. If O
1684 is 0, the word is output verbatim without truncating or padding.
1685 I and O are initially set to 1. When I changes, any buffered input is
1686 re-scanned according to the new I. EOF also terminates the last word.
1687 """
1688
1689 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001690 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001691 self.reset()
1692
1693 def __repr__(self):
1694 return '<SID %x>' % id(self)
1695
1696 def reset(self):
1697 self.i = 1
1698 self.o = 1
1699 self.buffer = bytearray()
1700
1701 def getstate(self):
1702 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1703 return bytes(self.buffer), i*100 + o
1704
1705 def setstate(self, state):
1706 buffer, io = state
1707 self.buffer = bytearray(buffer)
1708 i, o = divmod(io, 100)
1709 self.i, self.o = i ^ 1, o ^ 1
1710
1711 def decode(self, input, final=False):
1712 output = ''
1713 for b in input:
1714 if self.i == 0: # variable-length, terminated with period
1715 if b == ord('.'):
1716 if self.buffer:
1717 output += self.process_word()
1718 else:
1719 self.buffer.append(b)
1720 else: # fixed-length, terminate after self.i bytes
1721 self.buffer.append(b)
1722 if len(self.buffer) == self.i:
1723 output += self.process_word()
1724 if final and self.buffer: # EOF terminates the last word
1725 output += self.process_word()
1726 return output
1727
1728 def process_word(self):
1729 output = ''
1730 if self.buffer[0] == ord('i'):
1731 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1732 elif self.buffer[0] == ord('o'):
1733 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1734 else:
1735 output = self.buffer.decode('ascii')
1736 if len(output) < self.o:
1737 output += '-'*self.o # pad out with hyphens
1738 if self.o:
1739 output = output[:self.o] # truncate to output length
1740 output += '.'
1741 self.buffer = bytearray()
1742 return output
1743
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001744 codecEnabled = False
1745
1746 @classmethod
1747 def lookupTestDecoder(cls, name):
1748 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001749 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001750 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001751 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001752 incrementalencoder=None,
1753 streamreader=None, streamwriter=None,
1754 incrementaldecoder=cls)
1755
1756# Register the previous decoder for testing.
1757# Disabled by default, tests will enable it.
1758codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1759
1760
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001761class StatefulIncrementalDecoderTest(unittest.TestCase):
1762 """
1763 Make sure the StatefulIncrementalDecoder actually works.
1764 """
1765
1766 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001767 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001768 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001769 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001770 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001771 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001772 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001773 # I=0, O=6 (variable-length input, fixed-length output)
1774 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1775 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001776 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001777 # I=6, O=3 (fixed-length input > fixed-length output)
1778 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1779 # I=0, then 3; O=29, then 15 (with longer output)
1780 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1781 'a----------------------------.' +
1782 'b----------------------------.' +
1783 'cde--------------------------.' +
1784 'abcdefghijabcde.' +
1785 'a.b------------.' +
1786 '.c.------------.' +
1787 'd.e------------.' +
1788 'k--------------.' +
1789 'l--------------.' +
1790 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001791 ]
1792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001794 # Try a few one-shot test cases.
1795 for input, eof, output in self.test_cases:
1796 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001797 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001798
1799 # Also test an unfinished decode, followed by forcing EOF.
1800 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001801 self.assertEqual(d.decode(b'oiabcd'), '')
1802 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001803
1804class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001805
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001806 def setUp(self):
1807 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1808 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001809 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001810
Guido van Rossumd0712812007-04-11 16:32:43 +00001811 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001812 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001813
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001814 def test_constructor(self):
1815 r = self.BytesIO(b"\xc3\xa9\n\n")
1816 b = self.BufferedReader(r, 1000)
1817 t = self.TextIOWrapper(b)
1818 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001819 self.assertEqual(t.encoding, "latin1")
1820 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001822 self.assertEqual(t.encoding, "utf8")
1823 self.assertEqual(t.line_buffering, True)
1824 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 self.assertRaises(TypeError, t.__init__, b, newline=42)
1826 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1827
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001828 def test_detach(self):
1829 r = self.BytesIO()
1830 b = self.BufferedWriter(r)
1831 t = self.TextIOWrapper(b)
1832 self.assertIs(t.detach(), b)
1833
1834 t = self.TextIOWrapper(b, encoding="ascii")
1835 t.write("howdy")
1836 self.assertFalse(r.getvalue())
1837 t.detach()
1838 self.assertEqual(r.getvalue(), b"howdy")
1839 self.assertRaises(ValueError, t.detach)
1840
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001841 def test_repr(self):
1842 raw = self.BytesIO("hello".encode("utf-8"))
1843 b = self.BufferedReader(raw)
1844 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001845 modname = self.TextIOWrapper.__module__
1846 self.assertEqual(repr(t),
1847 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1848 raw.name = "dummy"
1849 self.assertEqual(repr(t),
1850 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001851 t.mode = "r"
1852 self.assertEqual(repr(t),
1853 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001854 raw.name = b"dummy"
1855 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001856 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 def test_line_buffering(self):
1859 r = self.BytesIO()
1860 b = self.BufferedWriter(r, 1000)
1861 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001862 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001863 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001864 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001865 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001866 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 def test_encoding(self):
1870 # Check the encoding attribute is always set, and valid
1871 b = self.BytesIO()
1872 t = self.TextIOWrapper(b, encoding="utf8")
1873 self.assertEqual(t.encoding, "utf8")
1874 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001875 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 codecs.lookup(t.encoding)
1877
1878 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001879 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 b = self.BytesIO(b"abc\n\xff\n")
1881 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001882 self.assertRaises(UnicodeError, t.read)
1883 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 b = self.BytesIO(b"abc\n\xff\n")
1885 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001886 self.assertRaises(UnicodeError, t.read)
1887 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888 b = self.BytesIO(b"abc\n\xff\n")
1889 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001890 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001891 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001892 b = self.BytesIO(b"abc\n\xff\n")
1893 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001894 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001897 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 b = self.BytesIO()
1899 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001900 self.assertRaises(UnicodeError, t.write, "\xff")
1901 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 b = self.BytesIO()
1903 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001904 self.assertRaises(UnicodeError, t.write, "\xff")
1905 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 b = self.BytesIO()
1907 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001908 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001909 t.write("abc\xffdef\n")
1910 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001911 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001912 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 b = self.BytesIO()
1914 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001915 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001916 t.write("abc\xffdef\n")
1917 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001918 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001921 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1922
1923 tests = [
1924 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001925 [ '', input_lines ],
1926 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1927 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1928 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001929 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001930 encodings = (
1931 'utf-8', 'latin-1',
1932 'utf-16', 'utf-16-le', 'utf-16-be',
1933 'utf-32', 'utf-32-le', 'utf-32-be',
1934 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001935
Guido van Rossum8358db22007-08-18 21:39:55 +00001936 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001937 # character in TextIOWrapper._pending_line.
1938 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001939 # XXX: str.encode() should return bytes
1940 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001941 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001942 for bufsize in range(1, 10):
1943 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1945 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001946 encoding=encoding)
1947 if do_reads:
1948 got_lines = []
1949 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001950 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001951 if c2 == '':
1952 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001953 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001954 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001955 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001956 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001957
1958 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001959 self.assertEqual(got_line, exp_line)
1960 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 def test_newlines_input(self):
1963 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001964 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1965 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001966 (None, normalized.decode("ascii").splitlines(True)),
1967 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001968 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1969 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1970 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001971 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001972 buf = self.BytesIO(testdata)
1973 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001975 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 def test_newlines_output(self):
1979 testdict = {
1980 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1981 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1982 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1983 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1984 }
1985 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1986 for newline, expected in tests:
1987 buf = self.BytesIO()
1988 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1989 txt.write("AAA\nB")
1990 txt.write("BB\nCCC\n")
1991 txt.write("X\rY\r\nZ")
1992 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(buf.closed, False)
1994 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995
1996 def test_destructor(self):
1997 l = []
1998 base = self.BytesIO
1999 class MyBytesIO(base):
2000 def close(self):
2001 l.append(self.getvalue())
2002 base.close(self)
2003 b = MyBytesIO()
2004 t = self.TextIOWrapper(b, encoding="ascii")
2005 t.write("abc")
2006 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002007 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009
2010 def test_override_destructor(self):
2011 record = []
2012 class MyTextIO(self.TextIOWrapper):
2013 def __del__(self):
2014 record.append(1)
2015 try:
2016 f = super().__del__
2017 except AttributeError:
2018 pass
2019 else:
2020 f()
2021 def close(self):
2022 record.append(2)
2023 super().close()
2024 def flush(self):
2025 record.append(3)
2026 super().flush()
2027 b = self.BytesIO()
2028 t = MyTextIO(b, encoding="ascii")
2029 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002030 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 self.assertEqual(record, [1, 2, 3])
2032
2033 def test_error_through_destructor(self):
2034 # Test that the exception state is not modified by a destructor,
2035 # even if close() fails.
2036 rawio = self.CloseFailureIO()
2037 def f():
2038 self.TextIOWrapper(rawio).xyzzy
2039 with support.captured_output("stderr") as s:
2040 self.assertRaises(AttributeError, f)
2041 s = s.getvalue().strip()
2042 if s:
2043 # The destructor *may* have printed an unraisable error, check it
2044 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002045 self.assertTrue(s.startswith("Exception IOError: "), s)
2046 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002047
Guido van Rossum9b76da62007-04-11 01:09:03 +00002048 # Systematic tests of the text I/O API
2049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002051 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2052 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002054 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002055 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002056 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002057 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002058 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002059 self.assertEqual(f.tell(), 0)
2060 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002061 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002062 self.assertEqual(f.seek(0), 0)
2063 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002064 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002065 self.assertEqual(f.read(2), "ab")
2066 self.assertEqual(f.read(1), "c")
2067 self.assertEqual(f.read(1), "")
2068 self.assertEqual(f.read(), "")
2069 self.assertEqual(f.tell(), cookie)
2070 self.assertEqual(f.seek(0), 0)
2071 self.assertEqual(f.seek(0, 2), cookie)
2072 self.assertEqual(f.write("def"), 3)
2073 self.assertEqual(f.seek(cookie), cookie)
2074 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002075 if enc.startswith("utf"):
2076 self.multi_line_test(f, enc)
2077 f.close()
2078
2079 def multi_line_test(self, f, enc):
2080 f.seek(0)
2081 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002082 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002083 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002084 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002085 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002086 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002087 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002088 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002089 wlines.append((f.tell(), line))
2090 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002091 f.seek(0)
2092 rlines = []
2093 while True:
2094 pos = f.tell()
2095 line = f.readline()
2096 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002097 break
2098 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002099 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_telling(self):
2102 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002103 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002104 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002105 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002106 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002107 p2 = f.tell()
2108 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002109 self.assertEqual(f.tell(), p0)
2110 self.assertEqual(f.readline(), "\xff\n")
2111 self.assertEqual(f.tell(), p1)
2112 self.assertEqual(f.readline(), "\xff\n")
2113 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002114 f.seek(0)
2115 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002116 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002117 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002118 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002119 f.close()
2120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 def test_seeking(self):
2122 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002123 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002124 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002125 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002127 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002128 suffix = bytes(u_suffix.encode("utf-8"))
2129 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002130 with self.open(support.TESTFN, "wb") as f:
2131 f.write(line*2)
2132 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2133 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(s, str(prefix, "ascii"))
2135 self.assertEqual(f.tell(), prefix_size)
2136 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002139 # Regression test for a specific bug
2140 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002141 with self.open(support.TESTFN, "wb") as f:
2142 f.write(data)
2143 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2144 f._CHUNK_SIZE # Just test that it exists
2145 f._CHUNK_SIZE = 2
2146 f.readline()
2147 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 def test_seek_and_tell(self):
2150 #Test seek/tell using the StatefulIncrementalDecoder.
2151 # Make test faster by doing smaller seeks
2152 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002153
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002154 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002155 """Tell/seek to various points within a data stream and ensure
2156 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002158 f.write(data)
2159 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 f = self.open(support.TESTFN, encoding='test_decoder')
2161 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002162 decoded = f.read()
2163 f.close()
2164
Neal Norwitze2b07052008-03-18 19:52:05 +00002165 for i in range(min_pos, len(decoded) + 1): # seek positions
2166 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002169 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002171 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002173 f.close()
2174
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002175 # Enable the test decoder.
2176 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002177
2178 # Run the tests.
2179 try:
2180 # Try each test case.
2181 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002182 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002183
2184 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002185 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2186 offset = CHUNK_SIZE - len(input)//2
2187 prefix = b'.'*offset
2188 # Don't bother seeking into the prefix (takes too long).
2189 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002190 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002191
2192 # Ensure our test decoder won't interfere with subsequent tests.
2193 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002194 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002197 data = "1234567890"
2198 tests = ("utf-16",
2199 "utf-16-le",
2200 "utf-16-be",
2201 "utf-32",
2202 "utf-32-le",
2203 "utf-32-be")
2204 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 buf = self.BytesIO()
2206 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002207 # Check if the BOM is written only once (see issue1753).
2208 f.write(data)
2209 f.write(data)
2210 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002212 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(f.read(), data * 2)
2214 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002215
Benjamin Petersona1b49012009-03-31 23:11:32 +00002216 def test_unreadable(self):
2217 class UnReadable(self.BytesIO):
2218 def readable(self):
2219 return False
2220 txt = self.TextIOWrapper(UnReadable())
2221 self.assertRaises(IOError, txt.read)
2222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002223 def test_read_one_by_one(self):
2224 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002225 reads = ""
2226 while True:
2227 c = txt.read(1)
2228 if not c:
2229 break
2230 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002232
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002233 def test_readlines(self):
2234 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2235 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2236 txt.seek(0)
2237 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2238 txt.seek(0)
2239 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2240
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002241 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002243 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002245 reads = ""
2246 while True:
2247 c = txt.read(128)
2248 if not c:
2249 break
2250 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002251 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002252
2253 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002255
2256 # read one char at a time
2257 reads = ""
2258 while True:
2259 c = txt.read(1)
2260 if not c:
2261 break
2262 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002263 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264
2265 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267 txt._CHUNK_SIZE = 4
2268
2269 reads = ""
2270 while True:
2271 c = txt.read(4)
2272 if not c:
2273 break
2274 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002276
2277 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002278 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279 txt._CHUNK_SIZE = 4
2280
2281 reads = txt.read(4)
2282 reads += txt.read(4)
2283 reads += txt.readline()
2284 reads += txt.readline()
2285 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287
2288 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002289 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002290 txt._CHUNK_SIZE = 4
2291
2292 reads = txt.read(4)
2293 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002294 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002295
2296 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002298 txt._CHUNK_SIZE = 4
2299
2300 reads = txt.read(4)
2301 pos = txt.tell()
2302 txt.seek(0)
2303 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002304 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002305
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002306 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002307 buffer = self.BytesIO(self.testdata)
2308 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002309
2310 self.assertEqual(buffer.seekable(), txt.seekable())
2311
Antoine Pitroue4501852009-05-14 18:55:55 +00002312 def test_append_bom(self):
2313 # The BOM is not written again when appending to a non-empty file
2314 filename = support.TESTFN
2315 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2316 with self.open(filename, 'w', encoding=charset) as f:
2317 f.write('aaa')
2318 pos = f.tell()
2319 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002320 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002321
2322 with self.open(filename, 'a', encoding=charset) as f:
2323 f.write('xxx')
2324 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002325 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002326
2327 def test_seek_bom(self):
2328 # Same test, but when seeking manually
2329 filename = support.TESTFN
2330 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2331 with self.open(filename, 'w', encoding=charset) as f:
2332 f.write('aaa')
2333 pos = f.tell()
2334 with self.open(filename, 'r+', encoding=charset) as f:
2335 f.seek(pos)
2336 f.write('zzz')
2337 f.seek(0)
2338 f.write('bbb')
2339 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002340 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002341
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002342 def test_errors_property(self):
2343 with self.open(support.TESTFN, "w") as f:
2344 self.assertEqual(f.errors, "strict")
2345 with self.open(support.TESTFN, "w", errors="replace") as f:
2346 self.assertEqual(f.errors, "replace")
2347
Victor Stinner45df8202010-04-28 22:31:17 +00002348 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002349 def test_threads_write(self):
2350 # Issue6750: concurrent writes could duplicate data
2351 event = threading.Event()
2352 with self.open(support.TESTFN, "w", buffering=1) as f:
2353 def run(n):
2354 text = "Thread%03d\n" % n
2355 event.wait()
2356 f.write(text)
2357 threads = [threading.Thread(target=lambda n=x: run(n))
2358 for x in range(20)]
2359 for t in threads:
2360 t.start()
2361 time.sleep(0.02)
2362 event.set()
2363 for t in threads:
2364 t.join()
2365 with self.open(support.TESTFN) as f:
2366 content = f.read()
2367 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002368 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002369
Antoine Pitrou6be88762010-05-03 16:48:20 +00002370 def test_flush_error_on_close(self):
2371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2372 def bad_flush():
2373 raise IOError()
2374 txt.flush = bad_flush
2375 self.assertRaises(IOError, txt.close) # exception not swallowed
2376
2377 def test_multi_close(self):
2378 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2379 txt.close()
2380 txt.close()
2381 txt.close()
2382 self.assertRaises(ValueError, txt.flush)
2383
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002384 def test_unseekable(self):
2385 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2386 self.assertRaises(self.UnsupportedOperation, txt.tell)
2387 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2388
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002389 def test_readonly_attributes(self):
2390 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2391 buf = self.BytesIO(self.testdata)
2392 with self.assertRaises(AttributeError):
2393 txt.buffer = buf
2394
Antoine Pitroue96ec682011-07-23 21:46:35 +02002395 def test_rawio(self):
2396 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2397 # that subprocess.Popen() can have the required unbuffered
2398 # semantics with universal_newlines=True.
2399 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2400 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2401 # Reads
2402 self.assertEqual(txt.read(4), 'abcd')
2403 self.assertEqual(txt.readline(), 'efghi\n')
2404 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2405
2406 def test_rawio_write_through(self):
2407 # Issue #12591: with write_through=True, writes don't need a flush
2408 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2409 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2410 write_through=True)
2411 txt.write('1')
2412 txt.write('23\n4')
2413 txt.write('5')
2414 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2415
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416class CTextIOWrapperTest(TextIOWrapperTest):
2417
2418 def test_initialization(self):
2419 r = self.BytesIO(b"\xc3\xa9\n\n")
2420 b = self.BufferedReader(r, 1000)
2421 t = self.TextIOWrapper(b)
2422 self.assertRaises(TypeError, t.__init__, b, newline=42)
2423 self.assertRaises(ValueError, t.read)
2424 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2425 self.assertRaises(ValueError, t.read)
2426
2427 def test_garbage_collection(self):
2428 # C TextIOWrapper objects are collected, and collecting them flushes
2429 # all data to disk.
2430 # The Python version has __del__, so it ends in gc.garbage instead.
2431 rawio = io.FileIO(support.TESTFN, "wb")
2432 b = self.BufferedWriter(rawio)
2433 t = self.TextIOWrapper(b, encoding="ascii")
2434 t.write("456def")
2435 t.x = t
2436 wr = weakref.ref(t)
2437 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002438 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002439 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 self.assertEqual(f.read(), b"456def")
2442
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002443 def test_rwpair_cleared_before_textio(self):
2444 # Issue 13070: TextIOWrapper's finalization would crash when called
2445 # after the reference to the underlying BufferedRWPair's writer got
2446 # cleared by the GC.
2447 for i in range(1000):
2448 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2449 t1 = self.TextIOWrapper(b1, encoding="ascii")
2450 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2451 t2 = self.TextIOWrapper(b2, encoding="ascii")
2452 # circular references
2453 t1.buddy = t2
2454 t2.buddy = t1
2455 support.gc_collect()
2456
2457
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002458class PyTextIOWrapperTest(TextIOWrapperTest):
2459 pass
2460
2461
2462class IncrementalNewlineDecoderTest(unittest.TestCase):
2463
2464 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002465 # UTF-8 specific tests for a newline decoder
2466 def _check_decode(b, s, **kwargs):
2467 # We exercise getstate() / setstate() as well as decode()
2468 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002469 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002470 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002471 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002472
Antoine Pitrou180a3362008-12-14 16:36:46 +00002473 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002474
Antoine Pitrou180a3362008-12-14 16:36:46 +00002475 _check_decode(b'\xe8', "")
2476 _check_decode(b'\xa2', "")
2477 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002478
Antoine Pitrou180a3362008-12-14 16:36:46 +00002479 _check_decode(b'\xe8', "")
2480 _check_decode(b'\xa2', "")
2481 _check_decode(b'\x88', "\u8888")
2482
2483 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002484 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2485
Antoine Pitrou180a3362008-12-14 16:36:46 +00002486 decoder.reset()
2487 _check_decode(b'\n', "\n")
2488 _check_decode(b'\r', "")
2489 _check_decode(b'', "\n", final=True)
2490 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002491
Antoine Pitrou180a3362008-12-14 16:36:46 +00002492 _check_decode(b'\r', "")
2493 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002494
Antoine Pitrou180a3362008-12-14 16:36:46 +00002495 _check_decode(b'\r\r\n', "\n\n")
2496 _check_decode(b'\r', "")
2497 _check_decode(b'\r', "\n")
2498 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002499
Antoine Pitrou180a3362008-12-14 16:36:46 +00002500 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2501 _check_decode(b'\xe8\xa2\x88', "\u8888")
2502 _check_decode(b'\n', "\n")
2503 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2504 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002505
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002507 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508 if encoding is not None:
2509 encoder = codecs.getincrementalencoder(encoding)()
2510 def _decode_bytewise(s):
2511 # Decode one byte at a time
2512 for b in encoder.encode(s):
2513 result.append(decoder.decode(bytes([b])))
2514 else:
2515 encoder = None
2516 def _decode_bytewise(s):
2517 # Decode one char at a time
2518 for c in s:
2519 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002521 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002523 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002527 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002528 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002529 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002530 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002531 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 input = "abc"
2533 if encoder is not None:
2534 encoder.reset()
2535 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002536 self.assertEqual(decoder.decode(input), "abc")
2537 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002538
2539 def test_newline_decoder(self):
2540 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 # None meaning the IncrementalNewlineDecoder takes unicode input
2542 # rather than bytes input
2543 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002544 'utf-16', 'utf-16-le', 'utf-16-be',
2545 'utf-32', 'utf-32-le', 'utf-32-be',
2546 )
2547 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 decoder = enc and codecs.getincrementaldecoder(enc)()
2549 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2550 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002551 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2553 self.check_newline_decoding_utf8(decoder)
2554
Antoine Pitrou66913e22009-03-06 23:40:56 +00002555 def test_newline_bytes(self):
2556 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2557 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002558 self.assertEqual(dec.newlines, None)
2559 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2560 self.assertEqual(dec.newlines, None)
2561 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2562 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002563 dec = self.IncrementalNewlineDecoder(None, translate=False)
2564 _check(dec)
2565 dec = self.IncrementalNewlineDecoder(None, translate=True)
2566 _check(dec)
2567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2569 pass
2570
2571class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2572 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002573
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002574
Guido van Rossum01a27522007-03-07 01:00:12 +00002575# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002576
Guido van Rossum5abbf752007-08-27 17:39:33 +00002577class MiscIOTest(unittest.TestCase):
2578
Barry Warsaw40e82462008-11-20 20:14:50 +00002579 def tearDown(self):
2580 support.unlink(support.TESTFN)
2581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002582 def test___all__(self):
2583 for name in self.io.__all__:
2584 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002585 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002586 if name == "open":
2587 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002588 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002589 self.assertTrue(issubclass(obj, Exception), name)
2590 elif not name.startswith("SEEK_"):
2591 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002592
Barry Warsaw40e82462008-11-20 20:14:50 +00002593 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002596 f.close()
2597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002599 self.assertEqual(f.name, support.TESTFN)
2600 self.assertEqual(f.buffer.name, support.TESTFN)
2601 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2602 self.assertEqual(f.mode, "U")
2603 self.assertEqual(f.buffer.mode, "rb")
2604 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002605 f.close()
2606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002607 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002608 self.assertEqual(f.mode, "w+")
2609 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2610 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002613 self.assertEqual(g.mode, "wb")
2614 self.assertEqual(g.raw.mode, "wb")
2615 self.assertEqual(g.name, f.fileno())
2616 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002617 f.close()
2618 g.close()
2619
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002620 def test_io_after_close(self):
2621 for kwargs in [
2622 {"mode": "w"},
2623 {"mode": "wb"},
2624 {"mode": "w", "buffering": 1},
2625 {"mode": "w", "buffering": 2},
2626 {"mode": "wb", "buffering": 0},
2627 {"mode": "r"},
2628 {"mode": "rb"},
2629 {"mode": "r", "buffering": 1},
2630 {"mode": "r", "buffering": 2},
2631 {"mode": "rb", "buffering": 0},
2632 {"mode": "w+"},
2633 {"mode": "w+b"},
2634 {"mode": "w+", "buffering": 1},
2635 {"mode": "w+", "buffering": 2},
2636 {"mode": "w+b", "buffering": 0},
2637 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002639 f.close()
2640 self.assertRaises(ValueError, f.flush)
2641 self.assertRaises(ValueError, f.fileno)
2642 self.assertRaises(ValueError, f.isatty)
2643 self.assertRaises(ValueError, f.__iter__)
2644 if hasattr(f, "peek"):
2645 self.assertRaises(ValueError, f.peek, 1)
2646 self.assertRaises(ValueError, f.read)
2647 if hasattr(f, "read1"):
2648 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002649 if hasattr(f, "readall"):
2650 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002651 if hasattr(f, "readinto"):
2652 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2653 self.assertRaises(ValueError, f.readline)
2654 self.assertRaises(ValueError, f.readlines)
2655 self.assertRaises(ValueError, f.seek, 0)
2656 self.assertRaises(ValueError, f.tell)
2657 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 self.assertRaises(ValueError, f.write,
2659 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002660 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663 def test_blockingioerror(self):
2664 # Various BlockingIOError issues
2665 self.assertRaises(TypeError, self.BlockingIOError)
2666 self.assertRaises(TypeError, self.BlockingIOError, 1)
2667 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2668 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2669 b = self.BlockingIOError(1, "")
2670 self.assertEqual(b.characters_written, 0)
2671 class C(str):
2672 pass
2673 c = C("")
2674 b = self.BlockingIOError(1, c)
2675 c.b = b
2676 b.c = c
2677 wr = weakref.ref(c)
2678 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002679 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002680 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681
2682 def test_abcs(self):
2683 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002684 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2685 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2686 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2687 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688
2689 def _check_abc_inheritance(self, abcmodule):
2690 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002691 self.assertIsInstance(f, abcmodule.IOBase)
2692 self.assertIsInstance(f, abcmodule.RawIOBase)
2693 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2694 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002696 self.assertIsInstance(f, abcmodule.IOBase)
2697 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2698 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2699 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002700 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002701 self.assertIsInstance(f, abcmodule.IOBase)
2702 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2703 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2704 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705
2706 def test_abc_inheritance(self):
2707 # Test implementations inherit from their respective ABCs
2708 self._check_abc_inheritance(self)
2709
2710 def test_abc_inheritance_official(self):
2711 # Test implementations inherit from the official ABCs of the
2712 # baseline "io" module.
2713 self._check_abc_inheritance(io)
2714
Antoine Pitroue033e062010-10-29 10:38:18 +00002715 def _check_warn_on_dealloc(self, *args, **kwargs):
2716 f = open(*args, **kwargs)
2717 r = repr(f)
2718 with self.assertWarns(ResourceWarning) as cm:
2719 f = None
2720 support.gc_collect()
2721 self.assertIn(r, str(cm.warning.args[0]))
2722
2723 def test_warn_on_dealloc(self):
2724 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2725 self._check_warn_on_dealloc(support.TESTFN, "wb")
2726 self._check_warn_on_dealloc(support.TESTFN, "w")
2727
2728 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2729 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002730 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002731 for fd in fds:
2732 try:
2733 os.close(fd)
2734 except EnvironmentError as e:
2735 if e.errno != errno.EBADF:
2736 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002737 self.addCleanup(cleanup_fds)
2738 r, w = os.pipe()
2739 fds += r, w
2740 self._check_warn_on_dealloc(r, *args, **kwargs)
2741 # When using closefd=False, there's no warning
2742 r, w = os.pipe()
2743 fds += r, w
2744 with warnings.catch_warnings(record=True) as recorded:
2745 open(r, *args, closefd=False, **kwargs)
2746 support.gc_collect()
2747 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002748
2749 def test_warn_on_dealloc_fd(self):
2750 self._check_warn_on_dealloc_fd("rb", buffering=0)
2751 self._check_warn_on_dealloc_fd("rb")
2752 self._check_warn_on_dealloc_fd("r")
2753
2754
Antoine Pitrou243757e2010-11-05 21:15:39 +00002755 def test_pickling(self):
2756 # Pickling file objects is forbidden
2757 for kwargs in [
2758 {"mode": "w"},
2759 {"mode": "wb"},
2760 {"mode": "wb", "buffering": 0},
2761 {"mode": "r"},
2762 {"mode": "rb"},
2763 {"mode": "rb", "buffering": 0},
2764 {"mode": "w+"},
2765 {"mode": "w+b"},
2766 {"mode": "w+b", "buffering": 0},
2767 ]:
2768 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2769 with self.open(support.TESTFN, **kwargs) as f:
2770 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2771
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002772 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2773 def test_nonblock_pipe_write_bigbuf(self):
2774 self._test_nonblock_pipe_write(16*1024)
2775
2776 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2777 def test_nonblock_pipe_write_smallbuf(self):
2778 self._test_nonblock_pipe_write(1024)
2779
2780 def _set_non_blocking(self, fd):
2781 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2782 self.assertNotEqual(flags, -1)
2783 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2784 self.assertEqual(res, 0)
2785
2786 def _test_nonblock_pipe_write(self, bufsize):
2787 sent = []
2788 received = []
2789 r, w = os.pipe()
2790 self._set_non_blocking(r)
2791 self._set_non_blocking(w)
2792
2793 # To exercise all code paths in the C implementation we need
2794 # to play with buffer sizes. For instance, if we choose a
2795 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2796 # then we will never get a partial write of the buffer.
2797 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2798 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2799
2800 with rf, wf:
2801 for N in 9999, 73, 7574:
2802 try:
2803 i = 0
2804 while True:
2805 msg = bytes([i % 26 + 97]) * N
2806 sent.append(msg)
2807 wf.write(msg)
2808 i += 1
2809
2810 except self.BlockingIOError as e:
2811 self.assertEqual(e.args[0], errno.EAGAIN)
2812 sent[-1] = sent[-1][:e.characters_written]
2813 received.append(rf.read())
2814 msg = b'BLOCKED'
2815 wf.write(msg)
2816 sent.append(msg)
2817
2818 while True:
2819 try:
2820 wf.flush()
2821 break
2822 except self.BlockingIOError as e:
2823 self.assertEqual(e.args[0], errno.EAGAIN)
2824 self.assertEqual(e.characters_written, 0)
2825 received.append(rf.read())
2826
2827 received += iter(rf.read, None)
2828
2829 sent, received = b''.join(sent), b''.join(received)
2830 self.assertTrue(sent == received)
2831 self.assertTrue(wf.closed)
2832 self.assertTrue(rf.closed)
2833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002834class CMiscIOTest(MiscIOTest):
2835 io = io
2836
2837class PyMiscIOTest(MiscIOTest):
2838 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002839
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002840
2841@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2842class SignalsTest(unittest.TestCase):
2843
2844 def setUp(self):
2845 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2846
2847 def tearDown(self):
2848 signal.signal(signal.SIGALRM, self.oldalrm)
2849
2850 def alarm_interrupt(self, sig, frame):
2851 1/0
2852
2853 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002854 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2855 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002856 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2857 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002858 invokes the signal handler, and bubbles up the exception raised
2859 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002860 read_results = []
2861 def _read():
2862 s = os.read(r, 1)
2863 read_results.append(s)
2864 t = threading.Thread(target=_read)
2865 t.daemon = True
2866 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002867 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002868 try:
2869 wio = self.io.open(w, **fdopen_kwargs)
2870 t.start()
2871 signal.alarm(1)
2872 # Fill the pipe enough that the write will be blocking.
2873 # It will be interrupted by the timer armed above. Since the
2874 # other thread has read one byte, the low-level write will
2875 # return with a successful (partial) result rather than an EINTR.
2876 # The buffered IO layer must check for pending signal
2877 # handlers, which in this case will invoke alarm_interrupt().
2878 self.assertRaises(ZeroDivisionError,
2879 wio.write, item * (1024 * 1024))
2880 t.join()
2881 # We got one byte, get another one and check that it isn't a
2882 # repeat of the first one.
2883 read_results.append(os.read(r, 1))
2884 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2885 finally:
2886 os.close(w)
2887 os.close(r)
2888 # This is deliberate. If we didn't close the file descriptor
2889 # before closing wio, wio would try to flush its internal
2890 # buffer, and block again.
2891 try:
2892 wio.close()
2893 except IOError as e:
2894 if e.errno != errno.EBADF:
2895 raise
2896
2897 def test_interrupted_write_unbuffered(self):
2898 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2899
2900 def test_interrupted_write_buffered(self):
2901 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2902
2903 def test_interrupted_write_text(self):
2904 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2905
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002906 def check_reentrant_write(self, data, **fdopen_kwargs):
2907 def on_alarm(*args):
2908 # Will be called reentrantly from the same thread
2909 wio.write(data)
2910 1/0
2911 signal.signal(signal.SIGALRM, on_alarm)
2912 r, w = os.pipe()
2913 wio = self.io.open(w, **fdopen_kwargs)
2914 try:
2915 signal.alarm(1)
2916 # Either the reentrant call to wio.write() fails with RuntimeError,
2917 # or the signal handler raises ZeroDivisionError.
2918 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2919 while 1:
2920 for i in range(100):
2921 wio.write(data)
2922 wio.flush()
2923 # Make sure the buffer doesn't fill up and block further writes
2924 os.read(r, len(data) * 100)
2925 exc = cm.exception
2926 if isinstance(exc, RuntimeError):
2927 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2928 finally:
2929 wio.close()
2930 os.close(r)
2931
2932 def test_reentrant_write_buffered(self):
2933 self.check_reentrant_write(b"xy", mode="wb")
2934
2935 def test_reentrant_write_text(self):
2936 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2937
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002938 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2939 """Check that a buffered read, when it gets interrupted (either
2940 returning a partial result or EINTR), properly invokes the signal
2941 handler and retries if the latter returned successfully."""
2942 r, w = os.pipe()
2943 fdopen_kwargs["closefd"] = False
2944 def alarm_handler(sig, frame):
2945 os.write(w, b"bar")
2946 signal.signal(signal.SIGALRM, alarm_handler)
2947 try:
2948 rio = self.io.open(r, **fdopen_kwargs)
2949 os.write(w, b"foo")
2950 signal.alarm(1)
2951 # Expected behaviour:
2952 # - first raw read() returns partial b"foo"
2953 # - second raw read() returns EINTR
2954 # - third raw read() returns b"bar"
2955 self.assertEqual(decode(rio.read(6)), "foobar")
2956 finally:
2957 rio.close()
2958 os.close(w)
2959 os.close(r)
2960
Antoine Pitrou20db5112011-08-19 20:32:34 +02002961 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002962 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2963 mode="rb")
2964
Antoine Pitrou20db5112011-08-19 20:32:34 +02002965 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002966 self.check_interrupted_read_retry(lambda x: x,
2967 mode="r")
2968
2969 @unittest.skipUnless(threading, 'Threading required for this test.')
2970 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2971 """Check that a buffered write, when it gets interrupted (either
2972 returning a partial result or EINTR), properly invokes the signal
2973 handler and retries if the latter returned successfully."""
2974 select = support.import_module("select")
2975 # A quantity that exceeds the buffer size of an anonymous pipe's
2976 # write end.
2977 N = 1024 * 1024
2978 r, w = os.pipe()
2979 fdopen_kwargs["closefd"] = False
2980 # We need a separate thread to read from the pipe and allow the
2981 # write() to finish. This thread is started after the SIGALRM is
2982 # received (forcing a first EINTR in write()).
2983 read_results = []
2984 write_finished = False
2985 def _read():
2986 while not write_finished:
2987 while r in select.select([r], [], [], 1.0)[0]:
2988 s = os.read(r, 1024)
2989 read_results.append(s)
2990 t = threading.Thread(target=_read)
2991 t.daemon = True
2992 def alarm1(sig, frame):
2993 signal.signal(signal.SIGALRM, alarm2)
2994 signal.alarm(1)
2995 def alarm2(sig, frame):
2996 t.start()
2997 signal.signal(signal.SIGALRM, alarm1)
2998 try:
2999 wio = self.io.open(w, **fdopen_kwargs)
3000 signal.alarm(1)
3001 # Expected behaviour:
3002 # - first raw write() is partial (because of the limited pipe buffer
3003 # and the first alarm)
3004 # - second raw write() returns EINTR (because of the second alarm)
3005 # - subsequent write()s are successful (either partial or complete)
3006 self.assertEqual(N, wio.write(item * N))
3007 wio.flush()
3008 write_finished = True
3009 t.join()
3010 self.assertEqual(N, sum(len(x) for x in read_results))
3011 finally:
3012 write_finished = True
3013 os.close(w)
3014 os.close(r)
3015 # This is deliberate. If we didn't close the file descriptor
3016 # before closing wio, wio would try to flush its internal
3017 # buffer, and could block (in case of failure).
3018 try:
3019 wio.close()
3020 except IOError as e:
3021 if e.errno != errno.EBADF:
3022 raise
3023
Antoine Pitrou20db5112011-08-19 20:32:34 +02003024 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003025 self.check_interrupted_write_retry(b"x", mode="wb")
3026
Antoine Pitrou20db5112011-08-19 20:32:34 +02003027 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003028 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3029
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003030
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003031class CSignalsTest(SignalsTest):
3032 io = io
3033
3034class PySignalsTest(SignalsTest):
3035 io = pyio
3036
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003037 # Handling reentrancy issues would slow down _pyio even more, so the
3038 # tests are disabled.
3039 test_reentrant_write_buffered = None
3040 test_reentrant_write_text = None
3041
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003042
Guido van Rossum28524c72007-02-27 05:47:44 +00003043def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003044 tests = (CIOTest, PyIOTest,
3045 CBufferedReaderTest, PyBufferedReaderTest,
3046 CBufferedWriterTest, PyBufferedWriterTest,
3047 CBufferedRWPairTest, PyBufferedRWPairTest,
3048 CBufferedRandomTest, PyBufferedRandomTest,
3049 StatefulIncrementalDecoderTest,
3050 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3051 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003052 CMiscIOTest, PyMiscIOTest,
3053 CSignalsTest, PySignalsTest,
3054 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003055
3056 # Put the namespaces of the IO module we are testing and some useful mock
3057 # classes in the __dict__ of each test.
3058 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003059 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003060 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3061 c_io_ns = {name : getattr(io, name) for name in all_members}
3062 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3063 globs = globals()
3064 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3065 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3066 # Avoid turning open into a bound method.
3067 py_io_ns["open"] = pyio.OpenWrapper
3068 for test in tests:
3069 if test.__name__.startswith("C"):
3070 for name, obj in c_io_ns.items():
3071 setattr(test, name, obj)
3072 elif test.__name__.startswith("Py"):
3073 for name, obj in py_io_ns.items():
3074 setattr(test, name, obj)
3075
3076 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003077
3078if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003079 test_main()