blob: 0c3e0b88d812fe757d80de27c6cd51f24cf90243 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Antoine Pitrou13348842012-01-29 18:36:34 +0100366 def test_open_handles_NUL_chars(self):
367 fn_with_NUL = 'foo\0bar'
368 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
369 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
370
Guido van Rossum28524c72007-02-27 05:47:44 +0000371 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000372 with self.open(support.TESTFN, "wb", buffering=0) as f:
373 self.assertEqual(f.readable(), False)
374 self.assertEqual(f.writable(), True)
375 self.assertEqual(f.seekable(), True)
376 self.write_ops(f)
377 with self.open(support.TESTFN, "rb", buffering=0) as f:
378 self.assertEqual(f.readable(), True)
379 self.assertEqual(f.writable(), False)
380 self.assertEqual(f.seekable(), True)
381 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000384 with self.open(support.TESTFN, "wb") as f:
385 self.assertEqual(f.readable(), False)
386 self.assertEqual(f.writable(), True)
387 self.assertEqual(f.seekable(), True)
388 self.write_ops(f)
389 with self.open(support.TESTFN, "rb") as f:
390 self.assertEqual(f.readable(), True)
391 self.assertEqual(f.writable(), False)
392 self.assertEqual(f.seekable(), True)
393 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000394
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000396 with self.open(support.TESTFN, "wb") as f:
397 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
398 with self.open(support.TESTFN, "rb") as f:
399 self.assertEqual(f.readline(), b"abc\n")
400 self.assertEqual(f.readline(10), b"def\n")
401 self.assertEqual(f.readline(2), b"xy")
402 self.assertEqual(f.readline(4), b"zzy\n")
403 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000404 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 self.assertRaises(TypeError, f.readline, 5.3)
406 with self.open(support.TESTFN, "r") as f:
407 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000408
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 self.write_ops(f)
412 data = f.getvalue()
413 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000415 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000416
Guido van Rossum53807da2007-04-10 19:01:47 +0000417 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 # On Windows and Mac OSX this test comsumes large resources; It takes
419 # a long time to build the >2GB file and takes >2GB of disk space
420 # therefore the resource must be enabled to run this test.
421 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000422 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000423 print("\nTesting large file ops skipped on %s." % sys.platform,
424 file=sys.stderr)
425 print("It requires %d bytes and a long time." % self.LARGE,
426 file=sys.stderr)
427 print("Use 'regrtest.py -u largefile test_io' to run it.",
428 file=sys.stderr)
429 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "w+b", 0) as f:
431 self.large_file_ops(f)
432 with self.open(support.TESTFN, "w+b") as f:
433 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000434
435 def test_with_open(self):
436 for bufsize in (0, 1, 100):
437 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000439 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000440 self.assertEqual(f.closed, True)
441 f = None
442 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000443 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000444 1/0
445 except ZeroDivisionError:
446 self.assertEqual(f.closed, True)
447 else:
448 self.fail("1/0 didn't raise an exception")
449
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 # issue 5008
451 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000459 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000460
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def test_destructor(self):
462 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000464 def __del__(self):
465 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 try:
467 f = super().__del__
468 except AttributeError:
469 pass
470 else:
471 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def close(self):
473 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def flush(self):
476 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000477 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000478 with support.check_warnings(('', ResourceWarning)):
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
481 del f
482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
484 with self.open(support.TESTFN, "rb") as f:
485 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super().__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super().close()
509 def flush(self):
510 record.append(self.on_flush)
511 super().flush()
512 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000514 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 self.assertEqual(record, [1, 2, 3])
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
519
520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
Guido van Rossum87429772007-04-10 21:06:59 +0000529 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Guido van Rossumd4103952007-04-12 05:44:49 +0000535 def test_array_writes(self):
536 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000537 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000542
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560
561 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000572 with support.check_warnings(('', ResourceWarning)):
573 f = self.FileIO(support.TESTFN, "wb")
574 f.write(b"abcxxx")
575 f.f = f
576 wr = weakref.ref(f)
577 del f
578 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000579 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000582
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 def test_unbounded_file(self):
584 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
585 zero = "/dev/zero"
586 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
598
Antoine Pitrou6be88762010-05-03 16:48:20 +0000599 def test_flush_error_on_close(self):
600 f = self.open(support.TESTFN, "wb", buffering=0)
601 def bad_flush():
602 raise IOError()
603 f.flush = bad_flush
604 self.assertRaises(IOError, f.close) # exception not swallowed
605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou328ec742010-09-14 18:37:24 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400626 def test_types_have_dict(self):
627 test = (
628 self.IOBase(),
629 self.RawIOBase(),
630 self.TextIOBase(),
631 self.StringIO(),
632 self.BytesIO()
633 )
634 for obj in test:
635 self.assertTrue(hasattr(obj, "__dict__"))
636
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200637 def test_fileio_closefd(self):
638 # Issue #4841
639 with self.open(__file__, 'rb') as f1, \
640 self.open(__file__, 'rb') as f2:
641 fileio = self.FileIO(f1.fileno(), closefd=False)
642 # .__init__() must not close f1
643 fileio.__init__(f2.fileno(), closefd=False)
644 f1.readline()
645 # .close() must not close f2
646 fileio.close()
647 f2.readline()
648
649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200651
652 def test_IOBase_finalize(self):
653 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
654 # class which inherits IOBase and an object of this class are caught
655 # in a reference cycle and close() is already in the method cache.
656 class MyIO(self.IOBase):
657 def close(self):
658 pass
659
660 # create an instance to populate the method cache
661 MyIO()
662 obj = MyIO()
663 obj.obj = obj
664 wr = weakref.ref(obj)
665 del MyIO
666 del obj
667 support.gc_collect()
668 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670class PyIOTest(IOTest):
671 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000672
Guido van Rossuma9e20242007-03-08 00:43:48 +0000673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674class CommonBufferedTests:
675 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
676
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000677 def test_detach(self):
678 raw = self.MockRawIO()
679 buf = self.tp(raw)
680 self.assertIs(buf.detach(), raw)
681 self.assertRaises(ValueError, buf.detach)
682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 def test_fileno(self):
684 rawio = self.MockRawIO()
685 bufio = self.tp(rawio)
686
Ezio Melottib3aedd42010-11-20 19:04:17 +0000687 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000688
689 def test_no_fileno(self):
690 # XXX will we always have fileno() function? If so, kill
691 # this test. Else, write it.
692 pass
693
694 def test_invalid_args(self):
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 # Invalid whence
698 self.assertRaises(ValueError, bufio.seek, 0, -1)
699 self.assertRaises(ValueError, bufio.seek, 0, 3)
700
701 def test_override_destructor(self):
702 tp = self.tp
703 record = []
704 class MyBufferedIO(tp):
705 def __del__(self):
706 record.append(1)
707 try:
708 f = super().__del__
709 except AttributeError:
710 pass
711 else:
712 f()
713 def close(self):
714 record.append(2)
715 super().close()
716 def flush(self):
717 record.append(3)
718 super().flush()
719 rawio = self.MockRawIO()
720 bufio = MyBufferedIO(rawio)
721 writable = bufio.writable()
722 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000723 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 if writable:
725 self.assertEqual(record, [1, 2, 3])
726 else:
727 self.assertEqual(record, [1, 2])
728
729 def test_context_manager(self):
730 # Test usability as a context manager
731 rawio = self.MockRawIO()
732 bufio = self.tp(rawio)
733 def _with():
734 with bufio:
735 pass
736 _with()
737 # bufio should now be closed, and using it a second time should raise
738 # a ValueError.
739 self.assertRaises(ValueError, _with)
740
741 def test_error_through_destructor(self):
742 # Test that the exception state is not modified by a destructor,
743 # even if close() fails.
744 rawio = self.CloseFailureIO()
745 def f():
746 self.tp(rawio).xyzzy
747 with support.captured_output("stderr") as s:
748 self.assertRaises(AttributeError, f)
749 s = s.getvalue().strip()
750 if s:
751 # The destructor *may* have printed an unraisable error, check it
752 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000753 self.assertTrue(s.startswith("Exception IOError: "), s)
754 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000755
Antoine Pitrou716c4442009-05-23 19:04:03 +0000756 def test_repr(self):
757 raw = self.MockRawIO()
758 b = self.tp(raw)
759 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
760 self.assertEqual(repr(b), "<%s>" % clsname)
761 raw.name = "dummy"
762 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
763 raw.name = b"dummy"
764 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
765
Antoine Pitrou6be88762010-05-03 16:48:20 +0000766 def test_flush_error_on_close(self):
767 raw = self.MockRawIO()
768 def bad_flush():
769 raise IOError()
770 raw.flush = bad_flush
771 b = self.tp(raw)
772 self.assertRaises(IOError, b.close) # exception not swallowed
773
774 def test_multi_close(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
777 b.close()
778 b.close()
779 b.close()
780 self.assertRaises(ValueError, b.flush)
781
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000782 def test_unseekable(self):
783 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
784 self.assertRaises(self.UnsupportedOperation, bufio.tell)
785 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
786
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000787 def test_readonly_attributes(self):
788 raw = self.MockRawIO()
789 buf = self.tp(raw)
790 x = self.MockRawIO()
791 with self.assertRaises(AttributeError):
792 buf.raw = x
793
Guido van Rossum78892e42007-04-06 17:31:18 +0000794
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200795class SizeofTest:
796
797 @support.cpython_only
798 def test_sizeof(self):
799 bufsize1 = 4096
800 bufsize2 = 8192
801 rawio = self.MockRawIO()
802 bufio = self.tp(rawio, buffer_size=bufsize1)
803 size = sys.getsizeof(bufio) - bufsize1
804 rawio = self.MockRawIO()
805 bufio = self.tp(rawio, buffer_size=bufsize2)
806 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
807
808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000809class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
810 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 def test_constructor(self):
813 rawio = self.MockRawIO([b"abc"])
814 bufio = self.tp(rawio)
815 bufio.__init__(rawio)
816 bufio.__init__(rawio, buffer_size=1024)
817 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000818 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
822 rawio = self.MockRawIO([b"abc"])
823 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000827 for arg in (None, 7):
828 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
829 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000831 # Invalid args
832 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 def test_read1(self):
835 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
836 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000837 self.assertEqual(b"a", bufio.read(1))
838 self.assertEqual(b"b", bufio.read1(1))
839 self.assertEqual(rawio._reads, 1)
840 self.assertEqual(b"c", bufio.read1(100))
841 self.assertEqual(rawio._reads, 1)
842 self.assertEqual(b"d", bufio.read1(100))
843 self.assertEqual(rawio._reads, 2)
844 self.assertEqual(b"efg", bufio.read1(100))
845 self.assertEqual(rawio._reads, 3)
846 self.assertEqual(b"", bufio.read1(100))
847 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 # Invalid args
849 self.assertRaises(ValueError, bufio.read1, -1)
850
851 def test_readinto(self):
852 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
853 bufio = self.tp(rawio)
854 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000855 self.assertEqual(bufio.readinto(b), 2)
856 self.assertEqual(b, b"ab")
857 self.assertEqual(bufio.readinto(b), 2)
858 self.assertEqual(b, b"cd")
859 self.assertEqual(bufio.readinto(b), 2)
860 self.assertEqual(b, b"ef")
861 self.assertEqual(bufio.readinto(b), 1)
862 self.assertEqual(b, b"gf")
863 self.assertEqual(bufio.readinto(b), 0)
864 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000866 def test_readlines(self):
867 def bufio():
868 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
869 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
871 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
872 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000874 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000875 data = b"abcdefghi"
876 dlen = len(data)
877
878 tests = [
879 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
880 [ 100, [ 3, 3, 3], [ dlen ] ],
881 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
882 ]
883
884 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 rawio = self.MockFileIO(data)
886 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000887 pos = 0
888 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000889 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000890 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000891 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000892 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000895 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
897 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000898 self.assertEqual(b"abcd", bufio.read(6))
899 self.assertEqual(b"e", bufio.read(1))
900 self.assertEqual(b"fg", bufio.read())
901 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200902 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000903 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000904
Victor Stinnera80987f2011-05-25 22:47:16 +0200905 rawio = self.MockRawIO((b"a", None, None))
906 self.assertEqual(b"a", rawio.readall())
907 self.assertIsNone(rawio.readall())
908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_read_past_eof(self):
910 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
911 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000912
Ezio Melottib3aedd42010-11-20 19:04:17 +0000913 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 def test_read_all(self):
916 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
917 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000918
Ezio Melottib3aedd42010-11-20 19:04:17 +0000919 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000920
Victor Stinner45df8202010-04-28 22:31:17 +0000921 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000922 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000924 try:
925 # Write out many bytes with exactly the same number of 0's,
926 # 1's... 255's. This will help us check that concurrent reading
927 # doesn't duplicate or forget contents.
928 N = 1000
929 l = list(range(256)) * N
930 random.shuffle(l)
931 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000932 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000933 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000934 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000936 errors = []
937 results = []
938 def f():
939 try:
940 # Intra-buffer read then buffer-flushing read
941 for n in cycle([1, 19]):
942 s = bufio.read(n)
943 if not s:
944 break
945 # list.append() is atomic
946 results.append(s)
947 except Exception as e:
948 errors.append(e)
949 raise
950 threads = [threading.Thread(target=f) for x in range(20)]
951 for t in threads:
952 t.start()
953 time.sleep(0.02) # yield
954 for t in threads:
955 t.join()
956 self.assertFalse(errors,
957 "the following exceptions were caught: %r" % errors)
958 s = b''.join(results)
959 for i in range(256):
960 c = bytes(bytearray([i]))
961 self.assertEqual(s.count(c), N)
962 finally:
963 support.unlink(support.TESTFN)
964
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200965 def test_unseekable(self):
966 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
967 self.assertRaises(self.UnsupportedOperation, bufio.tell)
968 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
969 bufio.read(1)
970 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
971 self.assertRaises(self.UnsupportedOperation, bufio.tell)
972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 def test_misbehaved_io(self):
974 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
975 bufio = self.tp(rawio)
976 self.assertRaises(IOError, bufio.seek, 0)
977 self.assertRaises(IOError, bufio.tell)
978
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000979 def test_no_extraneous_read(self):
980 # Issue #9550; when the raw IO object has satisfied the read request,
981 # we should not issue any additional reads, otherwise it may block
982 # (e.g. socket).
983 bufsize = 16
984 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
985 rawio = self.MockRawIO([b"x" * n])
986 bufio = self.tp(rawio, bufsize)
987 self.assertEqual(bufio.read(n), b"x" * n)
988 # Simple case: one raw read is enough to satisfy the request.
989 self.assertEqual(rawio._extraneous_reads, 0,
990 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
991 # A more complex case where two raw reads are needed to satisfy
992 # the request.
993 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
994 bufio = self.tp(rawio, bufsize)
995 self.assertEqual(bufio.read(n), b"x" * n)
996 self.assertEqual(rawio._extraneous_reads, 0,
997 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
998
999
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001000class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 tp = io.BufferedReader
1002
1003 def test_constructor(self):
1004 BufferedReaderTest.test_constructor(self)
1005 # The allocation can succeed on 32-bit builds, e.g. with more
1006 # than 2GB RAM and a 64-bit kernel.
1007 if sys.maxsize > 0x7FFFFFFF:
1008 rawio = self.MockRawIO()
1009 bufio = self.tp(rawio)
1010 self.assertRaises((OverflowError, MemoryError, ValueError),
1011 bufio.__init__, rawio, sys.maxsize)
1012
1013 def test_initialization(self):
1014 rawio = self.MockRawIO([b"abc"])
1015 bufio = self.tp(rawio)
1016 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1017 self.assertRaises(ValueError, bufio.read)
1018 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1019 self.assertRaises(ValueError, bufio.read)
1020 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1021 self.assertRaises(ValueError, bufio.read)
1022
1023 def test_misbehaved_io_read(self):
1024 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1025 bufio = self.tp(rawio)
1026 # _pyio.BufferedReader seems to implement reading different, so that
1027 # checking this is not so easy.
1028 self.assertRaises(IOError, bufio.read, 10)
1029
1030 def test_garbage_collection(self):
1031 # C BufferedReader objects are collected.
1032 # The Python version has __del__, so it ends into gc.garbage instead
1033 rawio = self.FileIO(support.TESTFN, "w+b")
1034 f = self.tp(rawio)
1035 f.f = f
1036 wr = weakref.ref(f)
1037 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001038 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001039 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040
1041class PyBufferedReaderTest(BufferedReaderTest):
1042 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001043
Guido van Rossuma9e20242007-03-08 00:43:48 +00001044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1046 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_constructor(self):
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio)
1051 bufio.__init__(rawio)
1052 bufio.__init__(rawio, buffer_size=1024)
1053 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001054 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 bufio.flush()
1056 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1059 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001060 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001061 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001064 def test_detach_flush(self):
1065 raw = self.MockRawIO()
1066 buf = self.tp(raw)
1067 buf.write(b"howdy!")
1068 self.assertFalse(raw._write_stack)
1069 buf.detach()
1070 self.assertEqual(raw._write_stack, [b"howdy!"])
1071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001073 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 writer = self.MockRawIO()
1075 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001076 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001077 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 def test_write_overflow(self):
1080 writer = self.MockRawIO()
1081 bufio = self.tp(writer, 8)
1082 contents = b"abcdefghijklmnop"
1083 for n in range(0, len(contents), 3):
1084 bufio.write(contents[n:n+3])
1085 flushed = b"".join(writer._write_stack)
1086 # At least (total - 8) bytes were implicitly flushed, perhaps more
1087 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def check_writes(self, intermediate_func):
1091 # Lots of writes, test the flushed output is as expected.
1092 contents = bytes(range(256)) * 1000
1093 n = 0
1094 writer = self.MockRawIO()
1095 bufio = self.tp(writer, 13)
1096 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1097 def gen_sizes():
1098 for size in count(1):
1099 for i in range(15):
1100 yield size
1101 sizes = gen_sizes()
1102 while n < len(contents):
1103 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001104 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 intermediate_func(bufio)
1106 n += size
1107 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 def test_writes(self):
1111 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_writes_and_flushes(self):
1114 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 def test_writes_and_seeks(self):
1117 def _seekabs(bufio):
1118 pos = bufio.tell()
1119 bufio.seek(pos + 1, 0)
1120 bufio.seek(pos - 1, 0)
1121 bufio.seek(pos, 0)
1122 self.check_writes(_seekabs)
1123 def _seekrel(bufio):
1124 pos = bufio.seek(0, 1)
1125 bufio.seek(+1, 1)
1126 bufio.seek(-1, 1)
1127 bufio.seek(pos, 0)
1128 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130 def test_writes_and_truncates(self):
1131 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 def test_write_non_blocking(self):
1134 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001135 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001136
Ezio Melottib3aedd42010-11-20 19:04:17 +00001137 self.assertEqual(bufio.write(b"abcd"), 4)
1138 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 # 1 byte will be written, the rest will be buffered
1140 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1144 raw.block_on(b"0")
1145 try:
1146 bufio.write(b"opqrwxyz0123456789")
1147 except self.BlockingIOError as e:
1148 written = e.characters_written
1149 else:
1150 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001151 self.assertEqual(written, 16)
1152 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001154
Ezio Melottib3aedd42010-11-20 19:04:17 +00001155 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 s = raw.pop_written()
1157 # Previously buffered bytes were flushed
1158 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 def test_write_and_rewind(self):
1161 raw = io.BytesIO()
1162 bufio = self.tp(raw, 4)
1163 self.assertEqual(bufio.write(b"abcdef"), 6)
1164 self.assertEqual(bufio.tell(), 6)
1165 bufio.seek(0, 0)
1166 self.assertEqual(bufio.write(b"XY"), 2)
1167 bufio.seek(6, 0)
1168 self.assertEqual(raw.getvalue(), b"XYcdef")
1169 self.assertEqual(bufio.write(b"123456"), 6)
1170 bufio.flush()
1171 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 def test_flush(self):
1174 writer = self.MockRawIO()
1175 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001176 bufio.write(b"abc")
1177 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001178 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180 def test_destructor(self):
1181 writer = self.MockRawIO()
1182 bufio = self.tp(writer, 8)
1183 bufio.write(b"abc")
1184 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001185 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187
1188 def test_truncate(self):
1189 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001190 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001191 bufio = self.tp(raw, 8)
1192 bufio.write(b"abcdef")
1193 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001194 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001195 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 self.assertEqual(f.read(), b"abc")
1197
Victor Stinner45df8202010-04-28 22:31:17 +00001198 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001199 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001201 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 # Write out many bytes from many threads and test they were
1203 # all flushed.
1204 N = 1000
1205 contents = bytes(range(256)) * N
1206 sizes = cycle([1, 19])
1207 n = 0
1208 queue = deque()
1209 while n < len(contents):
1210 size = next(sizes)
1211 queue.append(contents[n:n+size])
1212 n += size
1213 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001214 # We use a real file object because it allows us to
1215 # exercise situations where the GIL is released before
1216 # writing the buffer to the raw streams. This is in addition
1217 # to concurrency issues due to switching threads in the middle
1218 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001219 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001221 errors = []
1222 def f():
1223 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 while True:
1225 try:
1226 s = queue.popleft()
1227 except IndexError:
1228 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001229 bufio.write(s)
1230 except Exception as e:
1231 errors.append(e)
1232 raise
1233 threads = [threading.Thread(target=f) for x in range(20)]
1234 for t in threads:
1235 t.start()
1236 time.sleep(0.02) # yield
1237 for t in threads:
1238 t.join()
1239 self.assertFalse(errors,
1240 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001242 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 s = f.read()
1244 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001246 finally:
1247 support.unlink(support.TESTFN)
1248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_misbehaved_io(self):
1250 rawio = self.MisbehavedRawIO()
1251 bufio = self.tp(rawio, 5)
1252 self.assertRaises(IOError, bufio.seek, 0)
1253 self.assertRaises(IOError, bufio.tell)
1254 self.assertRaises(IOError, bufio.write, b"abcdef")
1255
Benjamin Peterson59406a92009-03-26 17:10:29 +00001256 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001257 with support.check_warnings(("max_buffer_size is deprecated",
1258 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001259 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001260
1261
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001262class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 tp = io.BufferedWriter
1264
1265 def test_constructor(self):
1266 BufferedWriterTest.test_constructor(self)
1267 # The allocation can succeed on 32-bit builds, e.g. with more
1268 # than 2GB RAM and a 64-bit kernel.
1269 if sys.maxsize > 0x7FFFFFFF:
1270 rawio = self.MockRawIO()
1271 bufio = self.tp(rawio)
1272 self.assertRaises((OverflowError, MemoryError, ValueError),
1273 bufio.__init__, rawio, sys.maxsize)
1274
1275 def test_initialization(self):
1276 rawio = self.MockRawIO()
1277 bufio = self.tp(rawio)
1278 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1279 self.assertRaises(ValueError, bufio.write, b"def")
1280 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1281 self.assertRaises(ValueError, bufio.write, b"def")
1282 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1283 self.assertRaises(ValueError, bufio.write, b"def")
1284
1285 def test_garbage_collection(self):
1286 # C BufferedWriter objects are collected, and collecting them flushes
1287 # all data to disk.
1288 # The Python version has __del__, so it ends into gc.garbage instead
1289 rawio = self.FileIO(support.TESTFN, "w+b")
1290 f = self.tp(rawio)
1291 f.write(b"123xxx")
1292 f.x = f
1293 wr = weakref.ref(f)
1294 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001295 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001296 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001297 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 self.assertEqual(f.read(), b"123xxx")
1299
1300
1301class PyBufferedWriterTest(BufferedWriterTest):
1302 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001303
Guido van Rossum01a27522007-03-07 01:00:12 +00001304class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001305
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001306 def test_constructor(self):
1307 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001308 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001309
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001310 def test_detach(self):
1311 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1312 self.assertRaises(self.UnsupportedOperation, pair.detach)
1313
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001314 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001315 with support.check_warnings(("max_buffer_size is deprecated",
1316 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001317 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001318
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001319 def test_constructor_with_not_readable(self):
1320 class NotReadable(MockRawIO):
1321 def readable(self):
1322 return False
1323
1324 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1325
1326 def test_constructor_with_not_writeable(self):
1327 class NotWriteable(MockRawIO):
1328 def writable(self):
1329 return False
1330
1331 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1332
1333 def test_read(self):
1334 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1335
1336 self.assertEqual(pair.read(3), b"abc")
1337 self.assertEqual(pair.read(1), b"d")
1338 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001339 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1340 self.assertEqual(pair.read(None), b"abc")
1341
1342 def test_readlines(self):
1343 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1344 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1345 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1346 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001347
1348 def test_read1(self):
1349 # .read1() is delegated to the underlying reader object, so this test
1350 # can be shallow.
1351 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1352
1353 self.assertEqual(pair.read1(3), b"abc")
1354
1355 def test_readinto(self):
1356 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1357
1358 data = bytearray(5)
1359 self.assertEqual(pair.readinto(data), 5)
1360 self.assertEqual(data, b"abcde")
1361
1362 def test_write(self):
1363 w = self.MockRawIO()
1364 pair = self.tp(self.MockRawIO(), w)
1365
1366 pair.write(b"abc")
1367 pair.flush()
1368 pair.write(b"def")
1369 pair.flush()
1370 self.assertEqual(w._write_stack, [b"abc", b"def"])
1371
1372 def test_peek(self):
1373 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1374
1375 self.assertTrue(pair.peek(3).startswith(b"abc"))
1376 self.assertEqual(pair.read(3), b"abc")
1377
1378 def test_readable(self):
1379 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1380 self.assertTrue(pair.readable())
1381
1382 def test_writeable(self):
1383 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1384 self.assertTrue(pair.writable())
1385
1386 def test_seekable(self):
1387 # BufferedRWPairs are never seekable, even if their readers and writers
1388 # are.
1389 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390 self.assertFalse(pair.seekable())
1391
1392 # .flush() is delegated to the underlying writer object and has been
1393 # tested in the test_write method.
1394
1395 def test_close_and_closed(self):
1396 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1397 self.assertFalse(pair.closed)
1398 pair.close()
1399 self.assertTrue(pair.closed)
1400
1401 def test_isatty(self):
1402 class SelectableIsAtty(MockRawIO):
1403 def __init__(self, isatty):
1404 MockRawIO.__init__(self)
1405 self._isatty = isatty
1406
1407 def isatty(self):
1408 return self._isatty
1409
1410 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1411 self.assertFalse(pair.isatty())
1412
1413 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1414 self.assertTrue(pair.isatty())
1415
1416 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1417 self.assertTrue(pair.isatty())
1418
1419 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1420 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422class CBufferedRWPairTest(BufferedRWPairTest):
1423 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425class PyBufferedRWPairTest(BufferedRWPairTest):
1426 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428
1429class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1430 read_mode = "rb+"
1431 write_mode = "wb+"
1432
1433 def test_constructor(self):
1434 BufferedReaderTest.test_constructor(self)
1435 BufferedWriterTest.test_constructor(self)
1436
1437 def test_read_and_write(self):
1438 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001439 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001440
1441 self.assertEqual(b"as", rw.read(2))
1442 rw.write(b"ddd")
1443 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001444 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_seek_and_tell(self):
1449 raw = self.BytesIO(b"asdfghjkl")
1450 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001451
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(b"as", rw.read(2))
1453 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001454 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001456
Antoine Pitroue05565e2011-08-20 14:39:23 +02001457 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001458 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001459 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001461 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001462 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001463 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001464 self.assertEqual(7, rw.tell())
1465 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001466 rw.flush()
1467 self.assertEqual(b"asdf123fl", raw.getvalue())
1468
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001469 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 def check_flush_and_read(self, read_func):
1472 raw = self.BytesIO(b"abcdefghi")
1473 bufio = self.tp(raw)
1474
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(b"ef", read_func(bufio, 2))
1478 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001480 self.assertEqual(6, bufio.tell())
1481 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001482 raw.seek(0, 0)
1483 raw.write(b"XYZ")
1484 # flush() resets the read buffer
1485 bufio.flush()
1486 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488
1489 def test_flush_and_read(self):
1490 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1491
1492 def test_flush_and_readinto(self):
1493 def _readinto(bufio, n=-1):
1494 b = bytearray(n if n >= 0 else 9999)
1495 n = bufio.readinto(b)
1496 return bytes(b[:n])
1497 self.check_flush_and_read(_readinto)
1498
1499 def test_flush_and_peek(self):
1500 def _peek(bufio, n=-1):
1501 # This relies on the fact that the buffer can contain the whole
1502 # raw stream, otherwise peek() can return less.
1503 b = bufio.peek(n)
1504 if n != -1:
1505 b = b[:n]
1506 bufio.seek(len(b), 1)
1507 return b
1508 self.check_flush_and_read(_peek)
1509
1510 def test_flush_and_write(self):
1511 raw = self.BytesIO(b"abcdefghi")
1512 bufio = self.tp(raw)
1513
1514 bufio.write(b"123")
1515 bufio.flush()
1516 bufio.write(b"45")
1517 bufio.flush()
1518 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001519 self.assertEqual(b"12345fghi", raw.getvalue())
1520 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521
1522 def test_threads(self):
1523 BufferedReaderTest.test_threads(self)
1524 BufferedWriterTest.test_threads(self)
1525
1526 def test_writes_and_peek(self):
1527 def _peek(bufio):
1528 bufio.peek(1)
1529 self.check_writes(_peek)
1530 def _peek(bufio):
1531 pos = bufio.tell()
1532 bufio.seek(-1, 1)
1533 bufio.peek(1)
1534 bufio.seek(pos, 0)
1535 self.check_writes(_peek)
1536
1537 def test_writes_and_reads(self):
1538 def _read(bufio):
1539 bufio.seek(-1, 1)
1540 bufio.read(1)
1541 self.check_writes(_read)
1542
1543 def test_writes_and_read1s(self):
1544 def _read1(bufio):
1545 bufio.seek(-1, 1)
1546 bufio.read1(1)
1547 self.check_writes(_read1)
1548
1549 def test_writes_and_readintos(self):
1550 def _read(bufio):
1551 bufio.seek(-1, 1)
1552 bufio.readinto(bytearray(1))
1553 self.check_writes(_read)
1554
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001555 def test_write_after_readahead(self):
1556 # Issue #6629: writing after the buffer was filled by readahead should
1557 # first rewind the raw stream.
1558 for overwrite_size in [1, 5]:
1559 raw = self.BytesIO(b"A" * 10)
1560 bufio = self.tp(raw, 4)
1561 # Trigger readahead
1562 self.assertEqual(bufio.read(1), b"A")
1563 self.assertEqual(bufio.tell(), 1)
1564 # Overwriting should rewind the raw stream if it needs so
1565 bufio.write(b"B" * overwrite_size)
1566 self.assertEqual(bufio.tell(), overwrite_size + 1)
1567 # If the write size was smaller than the buffer size, flush() and
1568 # check that rewind happens.
1569 bufio.flush()
1570 self.assertEqual(bufio.tell(), overwrite_size + 1)
1571 s = raw.getvalue()
1572 self.assertEqual(s,
1573 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1574
Antoine Pitrou7c404892011-05-13 00:13:33 +02001575 def test_write_rewind_write(self):
1576 # Various combinations of reading / writing / seeking backwards / writing again
1577 def mutate(bufio, pos1, pos2):
1578 assert pos2 >= pos1
1579 # Fill the buffer
1580 bufio.seek(pos1)
1581 bufio.read(pos2 - pos1)
1582 bufio.write(b'\x02')
1583 # This writes earlier than the previous write, but still inside
1584 # the buffer.
1585 bufio.seek(pos1)
1586 bufio.write(b'\x01')
1587
1588 b = b"\x80\x81\x82\x83\x84"
1589 for i in range(0, len(b)):
1590 for j in range(i, len(b)):
1591 raw = self.BytesIO(b)
1592 bufio = self.tp(raw, 100)
1593 mutate(bufio, i, j)
1594 bufio.flush()
1595 expected = bytearray(b)
1596 expected[j] = 2
1597 expected[i] = 1
1598 self.assertEqual(raw.getvalue(), expected,
1599 "failed result for i=%d, j=%d" % (i, j))
1600
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001601 def test_truncate_after_read_or_write(self):
1602 raw = self.BytesIO(b"A" * 10)
1603 bufio = self.tp(raw, 100)
1604 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1605 self.assertEqual(bufio.truncate(), 2)
1606 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1607 self.assertEqual(bufio.truncate(), 4)
1608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_misbehaved_io(self):
1610 BufferedReaderTest.test_misbehaved_io(self)
1611 BufferedWriterTest.test_misbehaved_io(self)
1612
Antoine Pitroue05565e2011-08-20 14:39:23 +02001613 def test_interleaved_read_write(self):
1614 # Test for issue #12213
1615 with self.BytesIO(b'abcdefgh') as raw:
1616 with self.tp(raw, 100) as f:
1617 f.write(b"1")
1618 self.assertEqual(f.read(1), b'b')
1619 f.write(b'2')
1620 self.assertEqual(f.read1(1), b'd')
1621 f.write(b'3')
1622 buf = bytearray(1)
1623 f.readinto(buf)
1624 self.assertEqual(buf, b'f')
1625 f.write(b'4')
1626 self.assertEqual(f.peek(1), b'h')
1627 f.flush()
1628 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1629
1630 with self.BytesIO(b'abc') as raw:
1631 with self.tp(raw, 100) as f:
1632 self.assertEqual(f.read(1), b'a')
1633 f.write(b"2")
1634 self.assertEqual(f.read(1), b'c')
1635 f.flush()
1636 self.assertEqual(raw.getvalue(), b'a2c')
1637
1638 def test_interleaved_readline_write(self):
1639 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1640 with self.tp(raw) as f:
1641 f.write(b'1')
1642 self.assertEqual(f.readline(), b'b\n')
1643 f.write(b'2')
1644 self.assertEqual(f.readline(), b'def\n')
1645 f.write(b'3')
1646 self.assertEqual(f.readline(), b'\n')
1647 f.flush()
1648 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1649
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001650 # You can't construct a BufferedRandom over a non-seekable stream.
1651 test_unseekable = None
1652
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001653class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 tp = io.BufferedRandom
1655
1656 def test_constructor(self):
1657 BufferedRandomTest.test_constructor(self)
1658 # The allocation can succeed on 32-bit builds, e.g. with more
1659 # than 2GB RAM and a 64-bit kernel.
1660 if sys.maxsize > 0x7FFFFFFF:
1661 rawio = self.MockRawIO()
1662 bufio = self.tp(rawio)
1663 self.assertRaises((OverflowError, MemoryError, ValueError),
1664 bufio.__init__, rawio, sys.maxsize)
1665
1666 def test_garbage_collection(self):
1667 CBufferedReaderTest.test_garbage_collection(self)
1668 CBufferedWriterTest.test_garbage_collection(self)
1669
1670class PyBufferedRandomTest(BufferedRandomTest):
1671 tp = pyio.BufferedRandom
1672
1673
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001674# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1675# properties:
1676# - A single output character can correspond to many bytes of input.
1677# - The number of input bytes to complete the character can be
1678# undetermined until the last input byte is received.
1679# - The number of input bytes can vary depending on previous input.
1680# - A single input byte can correspond to many characters of output.
1681# - The number of output characters can be undetermined until the
1682# last input byte is received.
1683# - The number of output characters can vary depending on previous input.
1684
1685class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1686 """
1687 For testing seek/tell behavior with a stateful, buffering decoder.
1688
1689 Input is a sequence of words. Words may be fixed-length (length set
1690 by input) or variable-length (period-terminated). In variable-length
1691 mode, extra periods are ignored. Possible words are:
1692 - 'i' followed by a number sets the input length, I (maximum 99).
1693 When I is set to 0, words are space-terminated.
1694 - 'o' followed by a number sets the output length, O (maximum 99).
1695 - Any other word is converted into a word followed by a period on
1696 the output. The output word consists of the input word truncated
1697 or padded out with hyphens to make its length equal to O. If O
1698 is 0, the word is output verbatim without truncating or padding.
1699 I and O are initially set to 1. When I changes, any buffered input is
1700 re-scanned according to the new I. EOF also terminates the last word.
1701 """
1702
1703 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001704 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001705 self.reset()
1706
1707 def __repr__(self):
1708 return '<SID %x>' % id(self)
1709
1710 def reset(self):
1711 self.i = 1
1712 self.o = 1
1713 self.buffer = bytearray()
1714
1715 def getstate(self):
1716 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1717 return bytes(self.buffer), i*100 + o
1718
1719 def setstate(self, state):
1720 buffer, io = state
1721 self.buffer = bytearray(buffer)
1722 i, o = divmod(io, 100)
1723 self.i, self.o = i ^ 1, o ^ 1
1724
1725 def decode(self, input, final=False):
1726 output = ''
1727 for b in input:
1728 if self.i == 0: # variable-length, terminated with period
1729 if b == ord('.'):
1730 if self.buffer:
1731 output += self.process_word()
1732 else:
1733 self.buffer.append(b)
1734 else: # fixed-length, terminate after self.i bytes
1735 self.buffer.append(b)
1736 if len(self.buffer) == self.i:
1737 output += self.process_word()
1738 if final and self.buffer: # EOF terminates the last word
1739 output += self.process_word()
1740 return output
1741
1742 def process_word(self):
1743 output = ''
1744 if self.buffer[0] == ord('i'):
1745 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1746 elif self.buffer[0] == ord('o'):
1747 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1748 else:
1749 output = self.buffer.decode('ascii')
1750 if len(output) < self.o:
1751 output += '-'*self.o # pad out with hyphens
1752 if self.o:
1753 output = output[:self.o] # truncate to output length
1754 output += '.'
1755 self.buffer = bytearray()
1756 return output
1757
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001758 codecEnabled = False
1759
1760 @classmethod
1761 def lookupTestDecoder(cls, name):
1762 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001763 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001764 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001765 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001766 incrementalencoder=None,
1767 streamreader=None, streamwriter=None,
1768 incrementaldecoder=cls)
1769
1770# Register the previous decoder for testing.
1771# Disabled by default, tests will enable it.
1772codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1773
1774
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001775class StatefulIncrementalDecoderTest(unittest.TestCase):
1776 """
1777 Make sure the StatefulIncrementalDecoder actually works.
1778 """
1779
1780 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001781 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001782 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001783 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001784 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001785 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001786 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001787 # I=0, O=6 (variable-length input, fixed-length output)
1788 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1789 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001790 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001791 # I=6, O=3 (fixed-length input > fixed-length output)
1792 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1793 # I=0, then 3; O=29, then 15 (with longer output)
1794 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1795 'a----------------------------.' +
1796 'b----------------------------.' +
1797 'cde--------------------------.' +
1798 'abcdefghijabcde.' +
1799 'a.b------------.' +
1800 '.c.------------.' +
1801 'd.e------------.' +
1802 'k--------------.' +
1803 'l--------------.' +
1804 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001805 ]
1806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001808 # Try a few one-shot test cases.
1809 for input, eof, output in self.test_cases:
1810 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001811 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001812
1813 # Also test an unfinished decode, followed by forcing EOF.
1814 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001815 self.assertEqual(d.decode(b'oiabcd'), '')
1816 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001817
1818class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001819
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001820 def setUp(self):
1821 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1822 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001823 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001824
Guido van Rossumd0712812007-04-11 16:32:43 +00001825 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001826 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 def test_constructor(self):
1829 r = self.BytesIO(b"\xc3\xa9\n\n")
1830 b = self.BufferedReader(r, 1000)
1831 t = self.TextIOWrapper(b)
1832 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(t.encoding, "latin1")
1834 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001836 self.assertEqual(t.encoding, "utf8")
1837 self.assertEqual(t.line_buffering, True)
1838 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 self.assertRaises(TypeError, t.__init__, b, newline=42)
1840 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1841
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001842 def test_detach(self):
1843 r = self.BytesIO()
1844 b = self.BufferedWriter(r)
1845 t = self.TextIOWrapper(b)
1846 self.assertIs(t.detach(), b)
1847
1848 t = self.TextIOWrapper(b, encoding="ascii")
1849 t.write("howdy")
1850 self.assertFalse(r.getvalue())
1851 t.detach()
1852 self.assertEqual(r.getvalue(), b"howdy")
1853 self.assertRaises(ValueError, t.detach)
1854
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001855 def test_repr(self):
1856 raw = self.BytesIO("hello".encode("utf-8"))
1857 b = self.BufferedReader(raw)
1858 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001859 modname = self.TextIOWrapper.__module__
1860 self.assertEqual(repr(t),
1861 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1862 raw.name = "dummy"
1863 self.assertEqual(repr(t),
1864 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001865 t.mode = "r"
1866 self.assertEqual(repr(t),
1867 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001868 raw.name = b"dummy"
1869 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001870 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_line_buffering(self):
1873 r = self.BytesIO()
1874 b = self.BufferedWriter(r, 1000)
1875 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001876 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001877 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001878 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001880 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001881 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 def test_encoding(self):
1884 # Check the encoding attribute is always set, and valid
1885 b = self.BytesIO()
1886 t = self.TextIOWrapper(b, encoding="utf8")
1887 self.assertEqual(t.encoding, "utf8")
1888 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001889 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 codecs.lookup(t.encoding)
1891
1892 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001893 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894 b = self.BytesIO(b"abc\n\xff\n")
1895 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001896 self.assertRaises(UnicodeError, t.read)
1897 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 b = self.BytesIO(b"abc\n\xff\n")
1899 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001900 self.assertRaises(UnicodeError, t.read)
1901 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 b = self.BytesIO(b"abc\n\xff\n")
1903 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001904 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001905 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 b = self.BytesIO(b"abc\n\xff\n")
1907 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001908 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001911 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 b = self.BytesIO()
1913 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001914 self.assertRaises(UnicodeError, t.write, "\xff")
1915 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 b = self.BytesIO()
1917 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001918 self.assertRaises(UnicodeError, t.write, "\xff")
1919 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 b = self.BytesIO()
1921 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001922 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001923 t.write("abc\xffdef\n")
1924 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001925 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001926 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001927 b = self.BytesIO()
1928 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001929 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001930 t.write("abc\xffdef\n")
1931 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001932 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001935 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1936
1937 tests = [
1938 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001939 [ '', input_lines ],
1940 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1941 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1942 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001943 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001944 encodings = (
1945 'utf-8', 'latin-1',
1946 'utf-16', 'utf-16-le', 'utf-16-be',
1947 'utf-32', 'utf-32-le', 'utf-32-be',
1948 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001949
Guido van Rossum8358db22007-08-18 21:39:55 +00001950 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001951 # character in TextIOWrapper._pending_line.
1952 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001953 # XXX: str.encode() should return bytes
1954 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001955 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001956 for bufsize in range(1, 10):
1957 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1959 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001960 encoding=encoding)
1961 if do_reads:
1962 got_lines = []
1963 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001964 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001965 if c2 == '':
1966 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001967 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001968 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001969 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001970 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001971
1972 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001973 self.assertEqual(got_line, exp_line)
1974 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 def test_newlines_input(self):
1977 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001978 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1979 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001980 (None, normalized.decode("ascii").splitlines(True)),
1981 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1983 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1984 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001985 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 buf = self.BytesIO(testdata)
1987 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001988 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001989 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 def test_newlines_output(self):
1993 testdict = {
1994 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1995 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1996 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1997 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1998 }
1999 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2000 for newline, expected in tests:
2001 buf = self.BytesIO()
2002 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2003 txt.write("AAA\nB")
2004 txt.write("BB\nCCC\n")
2005 txt.write("X\rY\r\nZ")
2006 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(buf.closed, False)
2008 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009
2010 def test_destructor(self):
2011 l = []
2012 base = self.BytesIO
2013 class MyBytesIO(base):
2014 def close(self):
2015 l.append(self.getvalue())
2016 base.close(self)
2017 b = MyBytesIO()
2018 t = self.TextIOWrapper(b, encoding="ascii")
2019 t.write("abc")
2020 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002021 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002023
2024 def test_override_destructor(self):
2025 record = []
2026 class MyTextIO(self.TextIOWrapper):
2027 def __del__(self):
2028 record.append(1)
2029 try:
2030 f = super().__del__
2031 except AttributeError:
2032 pass
2033 else:
2034 f()
2035 def close(self):
2036 record.append(2)
2037 super().close()
2038 def flush(self):
2039 record.append(3)
2040 super().flush()
2041 b = self.BytesIO()
2042 t = MyTextIO(b, encoding="ascii")
2043 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002044 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 self.assertEqual(record, [1, 2, 3])
2046
2047 def test_error_through_destructor(self):
2048 # Test that the exception state is not modified by a destructor,
2049 # even if close() fails.
2050 rawio = self.CloseFailureIO()
2051 def f():
2052 self.TextIOWrapper(rawio).xyzzy
2053 with support.captured_output("stderr") as s:
2054 self.assertRaises(AttributeError, f)
2055 s = s.getvalue().strip()
2056 if s:
2057 # The destructor *may* have printed an unraisable error, check it
2058 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002059 self.assertTrue(s.startswith("Exception IOError: "), s)
2060 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002061
Guido van Rossum9b76da62007-04-11 01:09:03 +00002062 # Systematic tests of the text I/O API
2063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002065 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2066 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002068 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002069 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002070 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002072 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002073 self.assertEqual(f.tell(), 0)
2074 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002075 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002076 self.assertEqual(f.seek(0), 0)
2077 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002078 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(f.read(2), "ab")
2080 self.assertEqual(f.read(1), "c")
2081 self.assertEqual(f.read(1), "")
2082 self.assertEqual(f.read(), "")
2083 self.assertEqual(f.tell(), cookie)
2084 self.assertEqual(f.seek(0), 0)
2085 self.assertEqual(f.seek(0, 2), cookie)
2086 self.assertEqual(f.write("def"), 3)
2087 self.assertEqual(f.seek(cookie), cookie)
2088 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002089 if enc.startswith("utf"):
2090 self.multi_line_test(f, enc)
2091 f.close()
2092
2093 def multi_line_test(self, f, enc):
2094 f.seek(0)
2095 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002096 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002097 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002098 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002099 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002100 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002101 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002102 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002103 wlines.append((f.tell(), line))
2104 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002105 f.seek(0)
2106 rlines = []
2107 while True:
2108 pos = f.tell()
2109 line = f.readline()
2110 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002111 break
2112 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002115 def test_telling(self):
2116 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002117 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002118 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002119 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002120 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002121 p2 = f.tell()
2122 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(f.tell(), p0)
2124 self.assertEqual(f.readline(), "\xff\n")
2125 self.assertEqual(f.tell(), p1)
2126 self.assertEqual(f.readline(), "\xff\n")
2127 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002128 f.seek(0)
2129 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002130 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002131 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002133 f.close()
2134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 def test_seeking(self):
2136 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002137 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002138 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002139 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002140 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002141 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002142 suffix = bytes(u_suffix.encode("utf-8"))
2143 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002144 with self.open(support.TESTFN, "wb") as f:
2145 f.write(line*2)
2146 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2147 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002148 self.assertEqual(s, str(prefix, "ascii"))
2149 self.assertEqual(f.tell(), prefix_size)
2150 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002152 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002153 # Regression test for a specific bug
2154 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002155 with self.open(support.TESTFN, "wb") as f:
2156 f.write(data)
2157 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2158 f._CHUNK_SIZE # Just test that it exists
2159 f._CHUNK_SIZE = 2
2160 f.readline()
2161 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 def test_seek_and_tell(self):
2164 #Test seek/tell using the StatefulIncrementalDecoder.
2165 # Make test faster by doing smaller seeks
2166 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002167
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002168 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002169 """Tell/seek to various points within a data stream and ensure
2170 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002172 f.write(data)
2173 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002174 f = self.open(support.TESTFN, encoding='test_decoder')
2175 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002176 decoded = f.read()
2177 f.close()
2178
Neal Norwitze2b07052008-03-18 19:52:05 +00002179 for i in range(min_pos, len(decoded) + 1): # seek positions
2180 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002183 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002185 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002187 f.close()
2188
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002189 # Enable the test decoder.
2190 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002191
2192 # Run the tests.
2193 try:
2194 # Try each test case.
2195 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002196 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002197
2198 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002199 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2200 offset = CHUNK_SIZE - len(input)//2
2201 prefix = b'.'*offset
2202 # Don't bother seeking into the prefix (takes too long).
2203 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002204 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002205
2206 # Ensure our test decoder won't interfere with subsequent tests.
2207 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002208 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002211 data = "1234567890"
2212 tests = ("utf-16",
2213 "utf-16-le",
2214 "utf-16-be",
2215 "utf-32",
2216 "utf-32-le",
2217 "utf-32-be")
2218 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002219 buf = self.BytesIO()
2220 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002221 # Check if the BOM is written only once (see issue1753).
2222 f.write(data)
2223 f.write(data)
2224 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002225 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002226 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002227 self.assertEqual(f.read(), data * 2)
2228 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002229
Benjamin Petersona1b49012009-03-31 23:11:32 +00002230 def test_unreadable(self):
2231 class UnReadable(self.BytesIO):
2232 def readable(self):
2233 return False
2234 txt = self.TextIOWrapper(UnReadable())
2235 self.assertRaises(IOError, txt.read)
2236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237 def test_read_one_by_one(self):
2238 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002239 reads = ""
2240 while True:
2241 c = txt.read(1)
2242 if not c:
2243 break
2244 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002246
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002247 def test_readlines(self):
2248 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2249 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2250 txt.seek(0)
2251 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2252 txt.seek(0)
2253 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2254
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002255 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002256 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002257 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002259 reads = ""
2260 while True:
2261 c = txt.read(128)
2262 if not c:
2263 break
2264 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002266
2267 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269
2270 # read one char at a time
2271 reads = ""
2272 while True:
2273 c = txt.read(1)
2274 if not c:
2275 break
2276 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002278
2279 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002281 txt._CHUNK_SIZE = 4
2282
2283 reads = ""
2284 while True:
2285 c = txt.read(4)
2286 if not c:
2287 break
2288 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002289 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002290
2291 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002292 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002293 txt._CHUNK_SIZE = 4
2294
2295 reads = txt.read(4)
2296 reads += txt.read(4)
2297 reads += txt.readline()
2298 reads += txt.readline()
2299 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002300 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002301
2302 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002303 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002304 txt._CHUNK_SIZE = 4
2305
2306 reads = txt.read(4)
2307 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002308 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002309
2310 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002311 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002312 txt._CHUNK_SIZE = 4
2313
2314 reads = txt.read(4)
2315 pos = txt.tell()
2316 txt.seek(0)
2317 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002318 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002319
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002320 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 buffer = self.BytesIO(self.testdata)
2322 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002323
2324 self.assertEqual(buffer.seekable(), txt.seekable())
2325
Antoine Pitroue4501852009-05-14 18:55:55 +00002326 def test_append_bom(self):
2327 # The BOM is not written again when appending to a non-empty file
2328 filename = support.TESTFN
2329 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2330 with self.open(filename, 'w', encoding=charset) as f:
2331 f.write('aaa')
2332 pos = f.tell()
2333 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002335
2336 with self.open(filename, 'a', encoding=charset) as f:
2337 f.write('xxx')
2338 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002339 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002340
2341 def test_seek_bom(self):
2342 # Same test, but when seeking manually
2343 filename = support.TESTFN
2344 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2345 with self.open(filename, 'w', encoding=charset) as f:
2346 f.write('aaa')
2347 pos = f.tell()
2348 with self.open(filename, 'r+', encoding=charset) as f:
2349 f.seek(pos)
2350 f.write('zzz')
2351 f.seek(0)
2352 f.write('bbb')
2353 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002354 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002355
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002356 def test_errors_property(self):
2357 with self.open(support.TESTFN, "w") as f:
2358 self.assertEqual(f.errors, "strict")
2359 with self.open(support.TESTFN, "w", errors="replace") as f:
2360 self.assertEqual(f.errors, "replace")
2361
Victor Stinner45df8202010-04-28 22:31:17 +00002362 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002363 def test_threads_write(self):
2364 # Issue6750: concurrent writes could duplicate data
2365 event = threading.Event()
2366 with self.open(support.TESTFN, "w", buffering=1) as f:
2367 def run(n):
2368 text = "Thread%03d\n" % n
2369 event.wait()
2370 f.write(text)
2371 threads = [threading.Thread(target=lambda n=x: run(n))
2372 for x in range(20)]
2373 for t in threads:
2374 t.start()
2375 time.sleep(0.02)
2376 event.set()
2377 for t in threads:
2378 t.join()
2379 with self.open(support.TESTFN) as f:
2380 content = f.read()
2381 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002382 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002383
Antoine Pitrou6be88762010-05-03 16:48:20 +00002384 def test_flush_error_on_close(self):
2385 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2386 def bad_flush():
2387 raise IOError()
2388 txt.flush = bad_flush
2389 self.assertRaises(IOError, txt.close) # exception not swallowed
2390
2391 def test_multi_close(self):
2392 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2393 txt.close()
2394 txt.close()
2395 txt.close()
2396 self.assertRaises(ValueError, txt.flush)
2397
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002398 def test_unseekable(self):
2399 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2400 self.assertRaises(self.UnsupportedOperation, txt.tell)
2401 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2402
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002403 def test_readonly_attributes(self):
2404 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2405 buf = self.BytesIO(self.testdata)
2406 with self.assertRaises(AttributeError):
2407 txt.buffer = buf
2408
Antoine Pitroue96ec682011-07-23 21:46:35 +02002409 def test_rawio(self):
2410 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2411 # that subprocess.Popen() can have the required unbuffered
2412 # semantics with universal_newlines=True.
2413 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2414 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2415 # Reads
2416 self.assertEqual(txt.read(4), 'abcd')
2417 self.assertEqual(txt.readline(), 'efghi\n')
2418 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2419
2420 def test_rawio_write_through(self):
2421 # Issue #12591: with write_through=True, writes don't need a flush
2422 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2423 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2424 write_through=True)
2425 txt.write('1')
2426 txt.write('23\n4')
2427 txt.write('5')
2428 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002430class CTextIOWrapperTest(TextIOWrapperTest):
2431
2432 def test_initialization(self):
2433 r = self.BytesIO(b"\xc3\xa9\n\n")
2434 b = self.BufferedReader(r, 1000)
2435 t = self.TextIOWrapper(b)
2436 self.assertRaises(TypeError, t.__init__, b, newline=42)
2437 self.assertRaises(ValueError, t.read)
2438 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2439 self.assertRaises(ValueError, t.read)
2440
2441 def test_garbage_collection(self):
2442 # C TextIOWrapper objects are collected, and collecting them flushes
2443 # all data to disk.
2444 # The Python version has __del__, so it ends in gc.garbage instead.
2445 rawio = io.FileIO(support.TESTFN, "wb")
2446 b = self.BufferedWriter(rawio)
2447 t = self.TextIOWrapper(b, encoding="ascii")
2448 t.write("456def")
2449 t.x = t
2450 wr = weakref.ref(t)
2451 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002452 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002453 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002454 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002455 self.assertEqual(f.read(), b"456def")
2456
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002457 def test_rwpair_cleared_before_textio(self):
2458 # Issue 13070: TextIOWrapper's finalization would crash when called
2459 # after the reference to the underlying BufferedRWPair's writer got
2460 # cleared by the GC.
2461 for i in range(1000):
2462 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2463 t1 = self.TextIOWrapper(b1, encoding="ascii")
2464 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2465 t2 = self.TextIOWrapper(b2, encoding="ascii")
2466 # circular references
2467 t1.buddy = t2
2468 t2.buddy = t1
2469 support.gc_collect()
2470
2471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472class PyTextIOWrapperTest(TextIOWrapperTest):
2473 pass
2474
2475
2476class IncrementalNewlineDecoderTest(unittest.TestCase):
2477
2478 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002479 # UTF-8 specific tests for a newline decoder
2480 def _check_decode(b, s, **kwargs):
2481 # We exercise getstate() / setstate() as well as decode()
2482 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002483 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002484 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002485 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002486
Antoine Pitrou180a3362008-12-14 16:36:46 +00002487 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002488
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 _check_decode(b'\xe8', "")
2490 _check_decode(b'\xa2', "")
2491 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002492
Antoine Pitrou180a3362008-12-14 16:36:46 +00002493 _check_decode(b'\xe8', "")
2494 _check_decode(b'\xa2', "")
2495 _check_decode(b'\x88', "\u8888")
2496
2497 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002498 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2499
Antoine Pitrou180a3362008-12-14 16:36:46 +00002500 decoder.reset()
2501 _check_decode(b'\n', "\n")
2502 _check_decode(b'\r', "")
2503 _check_decode(b'', "\n", final=True)
2504 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505
Antoine Pitrou180a3362008-12-14 16:36:46 +00002506 _check_decode(b'\r', "")
2507 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002508
Antoine Pitrou180a3362008-12-14 16:36:46 +00002509 _check_decode(b'\r\r\n', "\n\n")
2510 _check_decode(b'\r', "")
2511 _check_decode(b'\r', "\n")
2512 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002513
Antoine Pitrou180a3362008-12-14 16:36:46 +00002514 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2515 _check_decode(b'\xe8\xa2\x88', "\u8888")
2516 _check_decode(b'\n', "\n")
2517 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2518 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002521 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 if encoding is not None:
2523 encoder = codecs.getincrementalencoder(encoding)()
2524 def _decode_bytewise(s):
2525 # Decode one byte at a time
2526 for b in encoder.encode(s):
2527 result.append(decoder.decode(bytes([b])))
2528 else:
2529 encoder = None
2530 def _decode_bytewise(s):
2531 # Decode one char at a time
2532 for c in s:
2533 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002534 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002535 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002536 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002537 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002538 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002539 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002541 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002543 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002544 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002545 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 input = "abc"
2547 if encoder is not None:
2548 encoder.reset()
2549 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(decoder.decode(input), "abc")
2551 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002552
2553 def test_newline_decoder(self):
2554 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002555 # None meaning the IncrementalNewlineDecoder takes unicode input
2556 # rather than bytes input
2557 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002558 'utf-16', 'utf-16-le', 'utf-16-be',
2559 'utf-32', 'utf-32-le', 'utf-32-be',
2560 )
2561 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002562 decoder = enc and codecs.getincrementaldecoder(enc)()
2563 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2564 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002565 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002566 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2567 self.check_newline_decoding_utf8(decoder)
2568
Antoine Pitrou66913e22009-03-06 23:40:56 +00002569 def test_newline_bytes(self):
2570 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2571 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual(dec.newlines, None)
2573 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2574 self.assertEqual(dec.newlines, None)
2575 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2576 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002577 dec = self.IncrementalNewlineDecoder(None, translate=False)
2578 _check(dec)
2579 dec = self.IncrementalNewlineDecoder(None, translate=True)
2580 _check(dec)
2581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002582class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2583 pass
2584
2585class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2586 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002587
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002588
Guido van Rossum01a27522007-03-07 01:00:12 +00002589# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002590
Guido van Rossum5abbf752007-08-27 17:39:33 +00002591class MiscIOTest(unittest.TestCase):
2592
Barry Warsaw40e82462008-11-20 20:14:50 +00002593 def tearDown(self):
2594 support.unlink(support.TESTFN)
2595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 def test___all__(self):
2597 for name in self.io.__all__:
2598 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002599 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002600 if name == "open":
2601 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002602 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002603 self.assertTrue(issubclass(obj, Exception), name)
2604 elif not name.startswith("SEEK_"):
2605 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002606
Barry Warsaw40e82462008-11-20 20:14:50 +00002607 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002609 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002610 f.close()
2611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002613 self.assertEqual(f.name, support.TESTFN)
2614 self.assertEqual(f.buffer.name, support.TESTFN)
2615 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2616 self.assertEqual(f.mode, "U")
2617 self.assertEqual(f.buffer.mode, "rb")
2618 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002619 f.close()
2620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002621 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual(f.mode, "w+")
2623 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2624 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002627 self.assertEqual(g.mode, "wb")
2628 self.assertEqual(g.raw.mode, "wb")
2629 self.assertEqual(g.name, f.fileno())
2630 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002631 f.close()
2632 g.close()
2633
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002634 def test_io_after_close(self):
2635 for kwargs in [
2636 {"mode": "w"},
2637 {"mode": "wb"},
2638 {"mode": "w", "buffering": 1},
2639 {"mode": "w", "buffering": 2},
2640 {"mode": "wb", "buffering": 0},
2641 {"mode": "r"},
2642 {"mode": "rb"},
2643 {"mode": "r", "buffering": 1},
2644 {"mode": "r", "buffering": 2},
2645 {"mode": "rb", "buffering": 0},
2646 {"mode": "w+"},
2647 {"mode": "w+b"},
2648 {"mode": "w+", "buffering": 1},
2649 {"mode": "w+", "buffering": 2},
2650 {"mode": "w+b", "buffering": 0},
2651 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002653 f.close()
2654 self.assertRaises(ValueError, f.flush)
2655 self.assertRaises(ValueError, f.fileno)
2656 self.assertRaises(ValueError, f.isatty)
2657 self.assertRaises(ValueError, f.__iter__)
2658 if hasattr(f, "peek"):
2659 self.assertRaises(ValueError, f.peek, 1)
2660 self.assertRaises(ValueError, f.read)
2661 if hasattr(f, "read1"):
2662 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002663 if hasattr(f, "readall"):
2664 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002665 if hasattr(f, "readinto"):
2666 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2667 self.assertRaises(ValueError, f.readline)
2668 self.assertRaises(ValueError, f.readlines)
2669 self.assertRaises(ValueError, f.seek, 0)
2670 self.assertRaises(ValueError, f.tell)
2671 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002672 self.assertRaises(ValueError, f.write,
2673 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002674 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 def test_blockingioerror(self):
2678 # Various BlockingIOError issues
2679 self.assertRaises(TypeError, self.BlockingIOError)
2680 self.assertRaises(TypeError, self.BlockingIOError, 1)
2681 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2682 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2683 b = self.BlockingIOError(1, "")
2684 self.assertEqual(b.characters_written, 0)
2685 class C(str):
2686 pass
2687 c = C("")
2688 b = self.BlockingIOError(1, c)
2689 c.b = b
2690 b.c = c
2691 wr = weakref.ref(c)
2692 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002693 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002694 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695
2696 def test_abcs(self):
2697 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002698 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2699 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2700 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2701 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002702
2703 def _check_abc_inheritance(self, abcmodule):
2704 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002705 self.assertIsInstance(f, abcmodule.IOBase)
2706 self.assertIsInstance(f, abcmodule.RawIOBase)
2707 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2708 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002709 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002710 self.assertIsInstance(f, abcmodule.IOBase)
2711 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2712 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2713 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002715 self.assertIsInstance(f, abcmodule.IOBase)
2716 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2717 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2718 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719
2720 def test_abc_inheritance(self):
2721 # Test implementations inherit from their respective ABCs
2722 self._check_abc_inheritance(self)
2723
2724 def test_abc_inheritance_official(self):
2725 # Test implementations inherit from the official ABCs of the
2726 # baseline "io" module.
2727 self._check_abc_inheritance(io)
2728
Antoine Pitroue033e062010-10-29 10:38:18 +00002729 def _check_warn_on_dealloc(self, *args, **kwargs):
2730 f = open(*args, **kwargs)
2731 r = repr(f)
2732 with self.assertWarns(ResourceWarning) as cm:
2733 f = None
2734 support.gc_collect()
2735 self.assertIn(r, str(cm.warning.args[0]))
2736
2737 def test_warn_on_dealloc(self):
2738 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2739 self._check_warn_on_dealloc(support.TESTFN, "wb")
2740 self._check_warn_on_dealloc(support.TESTFN, "w")
2741
2742 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2743 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002744 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002745 for fd in fds:
2746 try:
2747 os.close(fd)
2748 except EnvironmentError as e:
2749 if e.errno != errno.EBADF:
2750 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002751 self.addCleanup(cleanup_fds)
2752 r, w = os.pipe()
2753 fds += r, w
2754 self._check_warn_on_dealloc(r, *args, **kwargs)
2755 # When using closefd=False, there's no warning
2756 r, w = os.pipe()
2757 fds += r, w
2758 with warnings.catch_warnings(record=True) as recorded:
2759 open(r, *args, closefd=False, **kwargs)
2760 support.gc_collect()
2761 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002762
2763 def test_warn_on_dealloc_fd(self):
2764 self._check_warn_on_dealloc_fd("rb", buffering=0)
2765 self._check_warn_on_dealloc_fd("rb")
2766 self._check_warn_on_dealloc_fd("r")
2767
2768
Antoine Pitrou243757e2010-11-05 21:15:39 +00002769 def test_pickling(self):
2770 # Pickling file objects is forbidden
2771 for kwargs in [
2772 {"mode": "w"},
2773 {"mode": "wb"},
2774 {"mode": "wb", "buffering": 0},
2775 {"mode": "r"},
2776 {"mode": "rb"},
2777 {"mode": "rb", "buffering": 0},
2778 {"mode": "w+"},
2779 {"mode": "w+b"},
2780 {"mode": "w+b", "buffering": 0},
2781 ]:
2782 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2783 with self.open(support.TESTFN, **kwargs) as f:
2784 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2785
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002786 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2787 def test_nonblock_pipe_write_bigbuf(self):
2788 self._test_nonblock_pipe_write(16*1024)
2789
2790 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2791 def test_nonblock_pipe_write_smallbuf(self):
2792 self._test_nonblock_pipe_write(1024)
2793
2794 def _set_non_blocking(self, fd):
2795 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2796 self.assertNotEqual(flags, -1)
2797 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2798 self.assertEqual(res, 0)
2799
2800 def _test_nonblock_pipe_write(self, bufsize):
2801 sent = []
2802 received = []
2803 r, w = os.pipe()
2804 self._set_non_blocking(r)
2805 self._set_non_blocking(w)
2806
2807 # To exercise all code paths in the C implementation we need
2808 # to play with buffer sizes. For instance, if we choose a
2809 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2810 # then we will never get a partial write of the buffer.
2811 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2812 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2813
2814 with rf, wf:
2815 for N in 9999, 73, 7574:
2816 try:
2817 i = 0
2818 while True:
2819 msg = bytes([i % 26 + 97]) * N
2820 sent.append(msg)
2821 wf.write(msg)
2822 i += 1
2823
2824 except self.BlockingIOError as e:
2825 self.assertEqual(e.args[0], errno.EAGAIN)
2826 sent[-1] = sent[-1][:e.characters_written]
2827 received.append(rf.read())
2828 msg = b'BLOCKED'
2829 wf.write(msg)
2830 sent.append(msg)
2831
2832 while True:
2833 try:
2834 wf.flush()
2835 break
2836 except self.BlockingIOError as e:
2837 self.assertEqual(e.args[0], errno.EAGAIN)
2838 self.assertEqual(e.characters_written, 0)
2839 received.append(rf.read())
2840
2841 received += iter(rf.read, None)
2842
2843 sent, received = b''.join(sent), b''.join(received)
2844 self.assertTrue(sent == received)
2845 self.assertTrue(wf.closed)
2846 self.assertTrue(rf.closed)
2847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848class CMiscIOTest(MiscIOTest):
2849 io = io
2850
2851class PyMiscIOTest(MiscIOTest):
2852 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002853
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002854
2855@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2856class SignalsTest(unittest.TestCase):
2857
2858 def setUp(self):
2859 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2860
2861 def tearDown(self):
2862 signal.signal(signal.SIGALRM, self.oldalrm)
2863
2864 def alarm_interrupt(self, sig, frame):
2865 1/0
2866
2867 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002868 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2869 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002870 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2871 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002872 invokes the signal handler, and bubbles up the exception raised
2873 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002874 read_results = []
2875 def _read():
2876 s = os.read(r, 1)
2877 read_results.append(s)
2878 t = threading.Thread(target=_read)
2879 t.daemon = True
2880 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002881 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002882 try:
2883 wio = self.io.open(w, **fdopen_kwargs)
2884 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002885 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002886 # Fill the pipe enough that the write will be blocking.
2887 # It will be interrupted by the timer armed above. Since the
2888 # other thread has read one byte, the low-level write will
2889 # return with a successful (partial) result rather than an EINTR.
2890 # The buffered IO layer must check for pending signal
2891 # handlers, which in this case will invoke alarm_interrupt().
2892 self.assertRaises(ZeroDivisionError,
2893 wio.write, item * (1024 * 1024))
2894 t.join()
2895 # We got one byte, get another one and check that it isn't a
2896 # repeat of the first one.
2897 read_results.append(os.read(r, 1))
2898 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2899 finally:
2900 os.close(w)
2901 os.close(r)
2902 # This is deliberate. If we didn't close the file descriptor
2903 # before closing wio, wio would try to flush its internal
2904 # buffer, and block again.
2905 try:
2906 wio.close()
2907 except IOError as e:
2908 if e.errno != errno.EBADF:
2909 raise
2910
2911 def test_interrupted_write_unbuffered(self):
2912 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2913
2914 def test_interrupted_write_buffered(self):
2915 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2916
2917 def test_interrupted_write_text(self):
2918 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2919
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002920 def check_reentrant_write(self, data, **fdopen_kwargs):
2921 def on_alarm(*args):
2922 # Will be called reentrantly from the same thread
2923 wio.write(data)
2924 1/0
2925 signal.signal(signal.SIGALRM, on_alarm)
2926 r, w = os.pipe()
2927 wio = self.io.open(w, **fdopen_kwargs)
2928 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002929 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002930 # Either the reentrant call to wio.write() fails with RuntimeError,
2931 # or the signal handler raises ZeroDivisionError.
2932 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2933 while 1:
2934 for i in range(100):
2935 wio.write(data)
2936 wio.flush()
2937 # Make sure the buffer doesn't fill up and block further writes
2938 os.read(r, len(data) * 100)
2939 exc = cm.exception
2940 if isinstance(exc, RuntimeError):
2941 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2942 finally:
2943 wio.close()
2944 os.close(r)
2945
2946 def test_reentrant_write_buffered(self):
2947 self.check_reentrant_write(b"xy", mode="wb")
2948
2949 def test_reentrant_write_text(self):
2950 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2951
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002952 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2953 """Check that a buffered read, when it gets interrupted (either
2954 returning a partial result or EINTR), properly invokes the signal
2955 handler and retries if the latter returned successfully."""
2956 r, w = os.pipe()
2957 fdopen_kwargs["closefd"] = False
2958 def alarm_handler(sig, frame):
2959 os.write(w, b"bar")
2960 signal.signal(signal.SIGALRM, alarm_handler)
2961 try:
2962 rio = self.io.open(r, **fdopen_kwargs)
2963 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002964 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002965 # Expected behaviour:
2966 # - first raw read() returns partial b"foo"
2967 # - second raw read() returns EINTR
2968 # - third raw read() returns b"bar"
2969 self.assertEqual(decode(rio.read(6)), "foobar")
2970 finally:
2971 rio.close()
2972 os.close(w)
2973 os.close(r)
2974
Antoine Pitrou20db5112011-08-19 20:32:34 +02002975 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002976 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2977 mode="rb")
2978
Antoine Pitrou20db5112011-08-19 20:32:34 +02002979 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002980 self.check_interrupted_read_retry(lambda x: x,
2981 mode="r")
2982
2983 @unittest.skipUnless(threading, 'Threading required for this test.')
2984 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2985 """Check that a buffered write, when it gets interrupted (either
2986 returning a partial result or EINTR), properly invokes the signal
2987 handler and retries if the latter returned successfully."""
2988 select = support.import_module("select")
2989 # A quantity that exceeds the buffer size of an anonymous pipe's
2990 # write end.
2991 N = 1024 * 1024
2992 r, w = os.pipe()
2993 fdopen_kwargs["closefd"] = False
2994 # We need a separate thread to read from the pipe and allow the
2995 # write() to finish. This thread is started after the SIGALRM is
2996 # received (forcing a first EINTR in write()).
2997 read_results = []
2998 write_finished = False
2999 def _read():
3000 while not write_finished:
3001 while r in select.select([r], [], [], 1.0)[0]:
3002 s = os.read(r, 1024)
3003 read_results.append(s)
3004 t = threading.Thread(target=_read)
3005 t.daemon = True
3006 def alarm1(sig, frame):
3007 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003008 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003009 def alarm2(sig, frame):
3010 t.start()
3011 signal.signal(signal.SIGALRM, alarm1)
3012 try:
3013 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003014 signal.alarm(1)
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003015 # Expected behaviour:
3016 # - first raw write() is partial (because of the limited pipe buffer
3017 # and the first alarm)
3018 # - second raw write() returns EINTR (because of the second alarm)
3019 # - subsequent write()s are successful (either partial or complete)
3020 self.assertEqual(N, wio.write(item * N))
3021 wio.flush()
3022 write_finished = True
3023 t.join()
3024 self.assertEqual(N, sum(len(x) for x in read_results))
3025 finally:
3026 write_finished = True
3027 os.close(w)
3028 os.close(r)
3029 # This is deliberate. If we didn't close the file descriptor
3030 # before closing wio, wio would try to flush its internal
3031 # buffer, and could block (in case of failure).
3032 try:
3033 wio.close()
3034 except IOError as e:
3035 if e.errno != errno.EBADF:
3036 raise
3037
Antoine Pitrou20db5112011-08-19 20:32:34 +02003038 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003039 self.check_interrupted_write_retry(b"x", mode="wb")
3040
Antoine Pitrou20db5112011-08-19 20:32:34 +02003041 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00003042 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3043
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003044
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003045class CSignalsTest(SignalsTest):
3046 io = io
3047
3048class PySignalsTest(SignalsTest):
3049 io = pyio
3050
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003051 # Handling reentrancy issues would slow down _pyio even more, so the
3052 # tests are disabled.
3053 test_reentrant_write_buffered = None
3054 test_reentrant_write_text = None
3055
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003056
Guido van Rossum28524c72007-02-27 05:47:44 +00003057def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003058 tests = (CIOTest, PyIOTest,
3059 CBufferedReaderTest, PyBufferedReaderTest,
3060 CBufferedWriterTest, PyBufferedWriterTest,
3061 CBufferedRWPairTest, PyBufferedRWPairTest,
3062 CBufferedRandomTest, PyBufferedRandomTest,
3063 StatefulIncrementalDecoderTest,
3064 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3065 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003066 CMiscIOTest, PyMiscIOTest,
3067 CSignalsTest, PySignalsTest,
3068 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003069
3070 # Put the namespaces of the IO module we are testing and some useful mock
3071 # classes in the __dict__ of each test.
3072 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003073 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003074 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3075 c_io_ns = {name : getattr(io, name) for name in all_members}
3076 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3077 globs = globals()
3078 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3079 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3080 # Avoid turning open into a bound method.
3081 py_io_ns["open"] = pyio.OpenWrapper
3082 for test in tests:
3083 if test.__name__.startswith("C"):
3084 for name, obj in c_io_ns.items():
3085 setattr(test, name, obj)
3086 elif test.__name__.startswith("Py"):
3087 for name, obj in py_io_ns.items():
3088 setattr(test, name, obj)
3089
3090 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003091
3092if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003093 test_main()