blob: 3724b5cc16b7a74f663fc9d529b9f0398ae2875a [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
614 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class PyIOTest(IOTest):
617 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000618
Guido van Rossuma9e20242007-03-08 00:43:48 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620class CommonBufferedTests:
621 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
622
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000623 def test_detach(self):
624 raw = self.MockRawIO()
625 buf = self.tp(raw)
626 self.assertIs(buf.detach(), raw)
627 self.assertRaises(ValueError, buf.detach)
628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 def test_fileno(self):
630 rawio = self.MockRawIO()
631 bufio = self.tp(rawio)
632
Ezio Melottib3aedd42010-11-20 19:04:17 +0000633 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def test_no_fileno(self):
636 # XXX will we always have fileno() function? If so, kill
637 # this test. Else, write it.
638 pass
639
640 def test_invalid_args(self):
641 rawio = self.MockRawIO()
642 bufio = self.tp(rawio)
643 # Invalid whence
644 self.assertRaises(ValueError, bufio.seek, 0, -1)
645 self.assertRaises(ValueError, bufio.seek, 0, 3)
646
647 def test_override_destructor(self):
648 tp = self.tp
649 record = []
650 class MyBufferedIO(tp):
651 def __del__(self):
652 record.append(1)
653 try:
654 f = super().__del__
655 except AttributeError:
656 pass
657 else:
658 f()
659 def close(self):
660 record.append(2)
661 super().close()
662 def flush(self):
663 record.append(3)
664 super().flush()
665 rawio = self.MockRawIO()
666 bufio = MyBufferedIO(rawio)
667 writable = bufio.writable()
668 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 if writable:
671 self.assertEqual(record, [1, 2, 3])
672 else:
673 self.assertEqual(record, [1, 2])
674
675 def test_context_manager(self):
676 # Test usability as a context manager
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 def _with():
680 with bufio:
681 pass
682 _with()
683 # bufio should now be closed, and using it a second time should raise
684 # a ValueError.
685 self.assertRaises(ValueError, _with)
686
687 def test_error_through_destructor(self):
688 # Test that the exception state is not modified by a destructor,
689 # even if close() fails.
690 rawio = self.CloseFailureIO()
691 def f():
692 self.tp(rawio).xyzzy
693 with support.captured_output("stderr") as s:
694 self.assertRaises(AttributeError, f)
695 s = s.getvalue().strip()
696 if s:
697 # The destructor *may* have printed an unraisable error, check it
698 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000699 self.assertTrue(s.startswith("Exception IOError: "), s)
700 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000701
Antoine Pitrou716c4442009-05-23 19:04:03 +0000702 def test_repr(self):
703 raw = self.MockRawIO()
704 b = self.tp(raw)
705 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
706 self.assertEqual(repr(b), "<%s>" % clsname)
707 raw.name = "dummy"
708 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
709 raw.name = b"dummy"
710 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
711
Antoine Pitrou6be88762010-05-03 16:48:20 +0000712 def test_flush_error_on_close(self):
713 raw = self.MockRawIO()
714 def bad_flush():
715 raise IOError()
716 raw.flush = bad_flush
717 b = self.tp(raw)
718 self.assertRaises(IOError, b.close) # exception not swallowed
719
720 def test_multi_close(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 b.close()
724 b.close()
725 b.close()
726 self.assertRaises(ValueError, b.flush)
727
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000728 def test_unseekable(self):
729 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
730 self.assertRaises(self.UnsupportedOperation, bufio.tell)
731 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
732
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000733 def test_readonly_attributes(self):
734 raw = self.MockRawIO()
735 buf = self.tp(raw)
736 x = self.MockRawIO()
737 with self.assertRaises(AttributeError):
738 buf.raw = x
739
Guido van Rossum78892e42007-04-06 17:31:18 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
742 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_constructor(self):
745 rawio = self.MockRawIO([b"abc"])
746 bufio = self.tp(rawio)
747 bufio.__init__(rawio)
748 bufio.__init__(rawio, buffer_size=1024)
749 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
752 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
754 rawio = self.MockRawIO([b"abc"])
755 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000759 for arg in (None, 7):
760 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
761 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000762 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 # Invalid args
764 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 def test_read1(self):
767 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
768 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000769 self.assertEqual(b"a", bufio.read(1))
770 self.assertEqual(b"b", bufio.read1(1))
771 self.assertEqual(rawio._reads, 1)
772 self.assertEqual(b"c", bufio.read1(100))
773 self.assertEqual(rawio._reads, 1)
774 self.assertEqual(b"d", bufio.read1(100))
775 self.assertEqual(rawio._reads, 2)
776 self.assertEqual(b"efg", bufio.read1(100))
777 self.assertEqual(rawio._reads, 3)
778 self.assertEqual(b"", bufio.read1(100))
779 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read1, -1)
782
783 def test_readinto(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
786 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000787 self.assertEqual(bufio.readinto(b), 2)
788 self.assertEqual(b, b"ab")
789 self.assertEqual(bufio.readinto(b), 2)
790 self.assertEqual(b, b"cd")
791 self.assertEqual(bufio.readinto(b), 2)
792 self.assertEqual(b, b"ef")
793 self.assertEqual(bufio.readinto(b), 1)
794 self.assertEqual(b, b"gf")
795 self.assertEqual(bufio.readinto(b), 0)
796 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200797 rawio = self.MockRawIO((b"abc", None))
798 bufio = self.tp(rawio)
799 self.assertEqual(bufio.readinto(b), 2)
800 self.assertEqual(b, b"ab")
801 self.assertEqual(bufio.readinto(b), 1)
802 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000804 def test_readlines(self):
805 def bufio():
806 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
807 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000808 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
809 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
810 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000813 data = b"abcdefghi"
814 dlen = len(data)
815
816 tests = [
817 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
818 [ 100, [ 3, 3, 3], [ dlen ] ],
819 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
820 ]
821
822 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 rawio = self.MockFileIO(data)
824 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000825 pos = 0
826 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000827 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000828 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000833 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
835 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000836
Ezio Melottib3aedd42010-11-20 19:04:17 +0000837 self.assertEqual(b"abcd", bufio.read(6))
838 self.assertEqual(b"e", bufio.read(1))
839 self.assertEqual(b"fg", bufio.read())
840 self.assertEqual(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000841 self.assertTrue(None is bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000842 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_read_past_eof(self):
845 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
846 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000847
Ezio Melottib3aedd42010-11-20 19:04:17 +0000848 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000850 def test_read_all(self):
851 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
852 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000853
Ezio Melottib3aedd42010-11-20 19:04:17 +0000854 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000855
Victor Stinner45df8202010-04-28 22:31:17 +0000856 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000857 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000859 try:
860 # Write out many bytes with exactly the same number of 0's,
861 # 1's... 255's. This will help us check that concurrent reading
862 # doesn't duplicate or forget contents.
863 N = 1000
864 l = list(range(256)) * N
865 random.shuffle(l)
866 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000867 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000868 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000869 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000871 errors = []
872 results = []
873 def f():
874 try:
875 # Intra-buffer read then buffer-flushing read
876 for n in cycle([1, 19]):
877 s = bufio.read(n)
878 if not s:
879 break
880 # list.append() is atomic
881 results.append(s)
882 except Exception as e:
883 errors.append(e)
884 raise
885 threads = [threading.Thread(target=f) for x in range(20)]
886 for t in threads:
887 t.start()
888 time.sleep(0.02) # yield
889 for t in threads:
890 t.join()
891 self.assertFalse(errors,
892 "the following exceptions were caught: %r" % errors)
893 s = b''.join(results)
894 for i in range(256):
895 c = bytes(bytearray([i]))
896 self.assertEqual(s.count(c), N)
897 finally:
898 support.unlink(support.TESTFN)
899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000900 def test_misbehaved_io(self):
901 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
902 bufio = self.tp(rawio)
903 self.assertRaises(IOError, bufio.seek, 0)
904 self.assertRaises(IOError, bufio.tell)
905
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000906 def test_no_extraneous_read(self):
907 # Issue #9550; when the raw IO object has satisfied the read request,
908 # we should not issue any additional reads, otherwise it may block
909 # (e.g. socket).
910 bufsize = 16
911 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
912 rawio = self.MockRawIO([b"x" * n])
913 bufio = self.tp(rawio, bufsize)
914 self.assertEqual(bufio.read(n), b"x" * n)
915 # Simple case: one raw read is enough to satisfy the request.
916 self.assertEqual(rawio._extraneous_reads, 0,
917 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
918 # A more complex case where two raw reads are needed to satisfy
919 # the request.
920 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
921 bufio = self.tp(rawio, bufsize)
922 self.assertEqual(bufio.read(n), b"x" * n)
923 self.assertEqual(rawio._extraneous_reads, 0,
924 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
925
926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927class CBufferedReaderTest(BufferedReaderTest):
928 tp = io.BufferedReader
929
930 def test_constructor(self):
931 BufferedReaderTest.test_constructor(self)
932 # The allocation can succeed on 32-bit builds, e.g. with more
933 # than 2GB RAM and a 64-bit kernel.
934 if sys.maxsize > 0x7FFFFFFF:
935 rawio = self.MockRawIO()
936 bufio = self.tp(rawio)
937 self.assertRaises((OverflowError, MemoryError, ValueError),
938 bufio.__init__, rawio, sys.maxsize)
939
940 def test_initialization(self):
941 rawio = self.MockRawIO([b"abc"])
942 bufio = self.tp(rawio)
943 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
944 self.assertRaises(ValueError, bufio.read)
945 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
946 self.assertRaises(ValueError, bufio.read)
947 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
948 self.assertRaises(ValueError, bufio.read)
949
950 def test_misbehaved_io_read(self):
951 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
953 # _pyio.BufferedReader seems to implement reading different, so that
954 # checking this is not so easy.
955 self.assertRaises(IOError, bufio.read, 10)
956
957 def test_garbage_collection(self):
958 # C BufferedReader objects are collected.
959 # The Python version has __del__, so it ends into gc.garbage instead
960 rawio = self.FileIO(support.TESTFN, "w+b")
961 f = self.tp(rawio)
962 f.f = f
963 wr = weakref.ref(f)
964 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000965 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000966 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967
968class PyBufferedReaderTest(BufferedReaderTest):
969 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000970
Guido van Rossuma9e20242007-03-08 00:43:48 +0000971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000972class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
973 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def test_constructor(self):
976 rawio = self.MockRawIO()
977 bufio = self.tp(rawio)
978 bufio.__init__(rawio)
979 bufio.__init__(rawio, buffer_size=1024)
980 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000981 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982 bufio.flush()
983 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
986 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000987 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000989 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000991 def test_detach_flush(self):
992 raw = self.MockRawIO()
993 buf = self.tp(raw)
994 buf.write(b"howdy!")
995 self.assertFalse(raw._write_stack)
996 buf.detach()
997 self.assertEqual(raw._write_stack, [b"howdy!"])
998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000999 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001000 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 writer = self.MockRawIO()
1002 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001003 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001004 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 def test_write_overflow(self):
1007 writer = self.MockRawIO()
1008 bufio = self.tp(writer, 8)
1009 contents = b"abcdefghijklmnop"
1010 for n in range(0, len(contents), 3):
1011 bufio.write(contents[n:n+3])
1012 flushed = b"".join(writer._write_stack)
1013 # At least (total - 8) bytes were implicitly flushed, perhaps more
1014 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001015 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001016
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017 def check_writes(self, intermediate_func):
1018 # Lots of writes, test the flushed output is as expected.
1019 contents = bytes(range(256)) * 1000
1020 n = 0
1021 writer = self.MockRawIO()
1022 bufio = self.tp(writer, 13)
1023 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1024 def gen_sizes():
1025 for size in count(1):
1026 for i in range(15):
1027 yield size
1028 sizes = gen_sizes()
1029 while n < len(contents):
1030 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001031 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 intermediate_func(bufio)
1033 n += size
1034 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_writes(self):
1038 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_writes_and_flushes(self):
1041 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 def test_writes_and_seeks(self):
1044 def _seekabs(bufio):
1045 pos = bufio.tell()
1046 bufio.seek(pos + 1, 0)
1047 bufio.seek(pos - 1, 0)
1048 bufio.seek(pos, 0)
1049 self.check_writes(_seekabs)
1050 def _seekrel(bufio):
1051 pos = bufio.seek(0, 1)
1052 bufio.seek(+1, 1)
1053 bufio.seek(-1, 1)
1054 bufio.seek(pos, 0)
1055 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_writes_and_truncates(self):
1058 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_write_non_blocking(self):
1061 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001062 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001063
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(bufio.write(b"abcd"), 4)
1065 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 # 1 byte will be written, the rest will be buffered
1067 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001068 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1071 raw.block_on(b"0")
1072 try:
1073 bufio.write(b"opqrwxyz0123456789")
1074 except self.BlockingIOError as e:
1075 written = e.characters_written
1076 else:
1077 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001078 self.assertEqual(written, 16)
1079 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001081
Ezio Melottib3aedd42010-11-20 19:04:17 +00001082 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 s = raw.pop_written()
1084 # Previously buffered bytes were flushed
1085 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087 def test_write_and_rewind(self):
1088 raw = io.BytesIO()
1089 bufio = self.tp(raw, 4)
1090 self.assertEqual(bufio.write(b"abcdef"), 6)
1091 self.assertEqual(bufio.tell(), 6)
1092 bufio.seek(0, 0)
1093 self.assertEqual(bufio.write(b"XY"), 2)
1094 bufio.seek(6, 0)
1095 self.assertEqual(raw.getvalue(), b"XYcdef")
1096 self.assertEqual(bufio.write(b"123456"), 6)
1097 bufio.flush()
1098 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001099
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100 def test_flush(self):
1101 writer = self.MockRawIO()
1102 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001103 bufio.write(b"abc")
1104 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001105 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001106
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 def test_destructor(self):
1108 writer = self.MockRawIO()
1109 bufio = self.tp(writer, 8)
1110 bufio.write(b"abc")
1111 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001112 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001113 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114
1115 def test_truncate(self):
1116 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001117 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 bufio = self.tp(raw, 8)
1119 bufio.write(b"abcdef")
1120 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001121 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001122 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 self.assertEqual(f.read(), b"abc")
1124
Victor Stinner45df8202010-04-28 22:31:17 +00001125 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001126 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001128 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 # Write out many bytes from many threads and test they were
1130 # all flushed.
1131 N = 1000
1132 contents = bytes(range(256)) * N
1133 sizes = cycle([1, 19])
1134 n = 0
1135 queue = deque()
1136 while n < len(contents):
1137 size = next(sizes)
1138 queue.append(contents[n:n+size])
1139 n += size
1140 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001141 # We use a real file object because it allows us to
1142 # exercise situations where the GIL is released before
1143 # writing the buffer to the raw streams. This is in addition
1144 # to concurrency issues due to switching threads in the middle
1145 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001146 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001148 errors = []
1149 def f():
1150 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 while True:
1152 try:
1153 s = queue.popleft()
1154 except IndexError:
1155 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001156 bufio.write(s)
1157 except Exception as e:
1158 errors.append(e)
1159 raise
1160 threads = [threading.Thread(target=f) for x in range(20)]
1161 for t in threads:
1162 t.start()
1163 time.sleep(0.02) # yield
1164 for t in threads:
1165 t.join()
1166 self.assertFalse(errors,
1167 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001169 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 s = f.read()
1171 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001172 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001173 finally:
1174 support.unlink(support.TESTFN)
1175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_misbehaved_io(self):
1177 rawio = self.MisbehavedRawIO()
1178 bufio = self.tp(rawio, 5)
1179 self.assertRaises(IOError, bufio.seek, 0)
1180 self.assertRaises(IOError, bufio.tell)
1181 self.assertRaises(IOError, bufio.write, b"abcdef")
1182
Benjamin Peterson59406a92009-03-26 17:10:29 +00001183 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001184 with support.check_warnings(("max_buffer_size is deprecated",
1185 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001186 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001187
1188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189class CBufferedWriterTest(BufferedWriterTest):
1190 tp = io.BufferedWriter
1191
1192 def test_constructor(self):
1193 BufferedWriterTest.test_constructor(self)
1194 # The allocation can succeed on 32-bit builds, e.g. with more
1195 # than 2GB RAM and a 64-bit kernel.
1196 if sys.maxsize > 0x7FFFFFFF:
1197 rawio = self.MockRawIO()
1198 bufio = self.tp(rawio)
1199 self.assertRaises((OverflowError, MemoryError, ValueError),
1200 bufio.__init__, rawio, sys.maxsize)
1201
1202 def test_initialization(self):
1203 rawio = self.MockRawIO()
1204 bufio = self.tp(rawio)
1205 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1206 self.assertRaises(ValueError, bufio.write, b"def")
1207 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1208 self.assertRaises(ValueError, bufio.write, b"def")
1209 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1210 self.assertRaises(ValueError, bufio.write, b"def")
1211
1212 def test_garbage_collection(self):
1213 # C BufferedWriter objects are collected, and collecting them flushes
1214 # all data to disk.
1215 # The Python version has __del__, so it ends into gc.garbage instead
1216 rawio = self.FileIO(support.TESTFN, "w+b")
1217 f = self.tp(rawio)
1218 f.write(b"123xxx")
1219 f.x = f
1220 wr = weakref.ref(f)
1221 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001222 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001223 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001224 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 self.assertEqual(f.read(), b"123xxx")
1226
1227
1228class PyBufferedWriterTest(BufferedWriterTest):
1229 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001230
Guido van Rossum01a27522007-03-07 01:00:12 +00001231class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001232
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001233 def test_constructor(self):
1234 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001235 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001236
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001237 def test_detach(self):
1238 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1239 self.assertRaises(self.UnsupportedOperation, pair.detach)
1240
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001241 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001242 with support.check_warnings(("max_buffer_size is deprecated",
1243 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001244 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001245
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001246 def test_constructor_with_not_readable(self):
1247 class NotReadable(MockRawIO):
1248 def readable(self):
1249 return False
1250
1251 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1252
1253 def test_constructor_with_not_writeable(self):
1254 class NotWriteable(MockRawIO):
1255 def writable(self):
1256 return False
1257
1258 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1259
1260 def test_read(self):
1261 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1262
1263 self.assertEqual(pair.read(3), b"abc")
1264 self.assertEqual(pair.read(1), b"d")
1265 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001266 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1267 self.assertEqual(pair.read(None), b"abc")
1268
1269 def test_readlines(self):
1270 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1271 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1272 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1273 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001274
1275 def test_read1(self):
1276 # .read1() is delegated to the underlying reader object, so this test
1277 # can be shallow.
1278 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1279
1280 self.assertEqual(pair.read1(3), b"abc")
1281
1282 def test_readinto(self):
1283 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1284
1285 data = bytearray(5)
1286 self.assertEqual(pair.readinto(data), 5)
1287 self.assertEqual(data, b"abcde")
1288
1289 def test_write(self):
1290 w = self.MockRawIO()
1291 pair = self.tp(self.MockRawIO(), w)
1292
1293 pair.write(b"abc")
1294 pair.flush()
1295 pair.write(b"def")
1296 pair.flush()
1297 self.assertEqual(w._write_stack, [b"abc", b"def"])
1298
1299 def test_peek(self):
1300 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1301
1302 self.assertTrue(pair.peek(3).startswith(b"abc"))
1303 self.assertEqual(pair.read(3), b"abc")
1304
1305 def test_readable(self):
1306 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1307 self.assertTrue(pair.readable())
1308
1309 def test_writeable(self):
1310 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1311 self.assertTrue(pair.writable())
1312
1313 def test_seekable(self):
1314 # BufferedRWPairs are never seekable, even if their readers and writers
1315 # are.
1316 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1317 self.assertFalse(pair.seekable())
1318
1319 # .flush() is delegated to the underlying writer object and has been
1320 # tested in the test_write method.
1321
1322 def test_close_and_closed(self):
1323 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1324 self.assertFalse(pair.closed)
1325 pair.close()
1326 self.assertTrue(pair.closed)
1327
1328 def test_isatty(self):
1329 class SelectableIsAtty(MockRawIO):
1330 def __init__(self, isatty):
1331 MockRawIO.__init__(self)
1332 self._isatty = isatty
1333
1334 def isatty(self):
1335 return self._isatty
1336
1337 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1338 self.assertFalse(pair.isatty())
1339
1340 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1341 self.assertTrue(pair.isatty())
1342
1343 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1344 self.assertTrue(pair.isatty())
1345
1346 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1347 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001349class CBufferedRWPairTest(BufferedRWPairTest):
1350 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001352class PyBufferedRWPairTest(BufferedRWPairTest):
1353 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001354
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355
1356class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1357 read_mode = "rb+"
1358 write_mode = "wb+"
1359
1360 def test_constructor(self):
1361 BufferedReaderTest.test_constructor(self)
1362 BufferedWriterTest.test_constructor(self)
1363
1364 def test_read_and_write(self):
1365 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001366 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001367
1368 self.assertEqual(b"as", rw.read(2))
1369 rw.write(b"ddd")
1370 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001371 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001373 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001374
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375 def test_seek_and_tell(self):
1376 raw = self.BytesIO(b"asdfghjkl")
1377 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001378
Ezio Melottib3aedd42010-11-20 19:04:17 +00001379 self.assertEqual(b"as", rw.read(2))
1380 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001381 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001382 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001383
1384 rw.write(b"asdf")
1385 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001386 self.assertEqual(b"asdfasdfl", rw.read())
1387 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001388 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001389 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001390 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001391 self.assertEqual(7, rw.tell())
1392 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001393 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001394
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001395 def check_flush_and_read(self, read_func):
1396 raw = self.BytesIO(b"abcdefghi")
1397 bufio = self.tp(raw)
1398
Ezio Melottib3aedd42010-11-20 19:04:17 +00001399 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(b"ef", read_func(bufio, 2))
1402 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001404 self.assertEqual(6, bufio.tell())
1405 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406 raw.seek(0, 0)
1407 raw.write(b"XYZ")
1408 # flush() resets the read buffer
1409 bufio.flush()
1410 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001411 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412
1413 def test_flush_and_read(self):
1414 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1415
1416 def test_flush_and_readinto(self):
1417 def _readinto(bufio, n=-1):
1418 b = bytearray(n if n >= 0 else 9999)
1419 n = bufio.readinto(b)
1420 return bytes(b[:n])
1421 self.check_flush_and_read(_readinto)
1422
1423 def test_flush_and_peek(self):
1424 def _peek(bufio, n=-1):
1425 # This relies on the fact that the buffer can contain the whole
1426 # raw stream, otherwise peek() can return less.
1427 b = bufio.peek(n)
1428 if n != -1:
1429 b = b[:n]
1430 bufio.seek(len(b), 1)
1431 return b
1432 self.check_flush_and_read(_peek)
1433
1434 def test_flush_and_write(self):
1435 raw = self.BytesIO(b"abcdefghi")
1436 bufio = self.tp(raw)
1437
1438 bufio.write(b"123")
1439 bufio.flush()
1440 bufio.write(b"45")
1441 bufio.flush()
1442 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(b"12345fghi", raw.getvalue())
1444 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445
1446 def test_threads(self):
1447 BufferedReaderTest.test_threads(self)
1448 BufferedWriterTest.test_threads(self)
1449
1450 def test_writes_and_peek(self):
1451 def _peek(bufio):
1452 bufio.peek(1)
1453 self.check_writes(_peek)
1454 def _peek(bufio):
1455 pos = bufio.tell()
1456 bufio.seek(-1, 1)
1457 bufio.peek(1)
1458 bufio.seek(pos, 0)
1459 self.check_writes(_peek)
1460
1461 def test_writes_and_reads(self):
1462 def _read(bufio):
1463 bufio.seek(-1, 1)
1464 bufio.read(1)
1465 self.check_writes(_read)
1466
1467 def test_writes_and_read1s(self):
1468 def _read1(bufio):
1469 bufio.seek(-1, 1)
1470 bufio.read1(1)
1471 self.check_writes(_read1)
1472
1473 def test_writes_and_readintos(self):
1474 def _read(bufio):
1475 bufio.seek(-1, 1)
1476 bufio.readinto(bytearray(1))
1477 self.check_writes(_read)
1478
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001479 def test_write_after_readahead(self):
1480 # Issue #6629: writing after the buffer was filled by readahead should
1481 # first rewind the raw stream.
1482 for overwrite_size in [1, 5]:
1483 raw = self.BytesIO(b"A" * 10)
1484 bufio = self.tp(raw, 4)
1485 # Trigger readahead
1486 self.assertEqual(bufio.read(1), b"A")
1487 self.assertEqual(bufio.tell(), 1)
1488 # Overwriting should rewind the raw stream if it needs so
1489 bufio.write(b"B" * overwrite_size)
1490 self.assertEqual(bufio.tell(), overwrite_size + 1)
1491 # If the write size was smaller than the buffer size, flush() and
1492 # check that rewind happens.
1493 bufio.flush()
1494 self.assertEqual(bufio.tell(), overwrite_size + 1)
1495 s = raw.getvalue()
1496 self.assertEqual(s,
1497 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1498
Antoine Pitrou7c404892011-05-13 00:13:33 +02001499 def test_write_rewind_write(self):
1500 # Various combinations of reading / writing / seeking backwards / writing again
1501 def mutate(bufio, pos1, pos2):
1502 assert pos2 >= pos1
1503 # Fill the buffer
1504 bufio.seek(pos1)
1505 bufio.read(pos2 - pos1)
1506 bufio.write(b'\x02')
1507 # This writes earlier than the previous write, but still inside
1508 # the buffer.
1509 bufio.seek(pos1)
1510 bufio.write(b'\x01')
1511
1512 b = b"\x80\x81\x82\x83\x84"
1513 for i in range(0, len(b)):
1514 for j in range(i, len(b)):
1515 raw = self.BytesIO(b)
1516 bufio = self.tp(raw, 100)
1517 mutate(bufio, i, j)
1518 bufio.flush()
1519 expected = bytearray(b)
1520 expected[j] = 2
1521 expected[i] = 1
1522 self.assertEqual(raw.getvalue(), expected,
1523 "failed result for i=%d, j=%d" % (i, j))
1524
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001525 def test_truncate_after_read_or_write(self):
1526 raw = self.BytesIO(b"A" * 10)
1527 bufio = self.tp(raw, 100)
1528 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1529 self.assertEqual(bufio.truncate(), 2)
1530 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1531 self.assertEqual(bufio.truncate(), 4)
1532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 def test_misbehaved_io(self):
1534 BufferedReaderTest.test_misbehaved_io(self)
1535 BufferedWriterTest.test_misbehaved_io(self)
1536
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001537 # You can't construct a BufferedRandom over a non-seekable stream.
1538 test_unseekable = None
1539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540class CBufferedRandomTest(BufferedRandomTest):
1541 tp = io.BufferedRandom
1542
1543 def test_constructor(self):
1544 BufferedRandomTest.test_constructor(self)
1545 # The allocation can succeed on 32-bit builds, e.g. with more
1546 # than 2GB RAM and a 64-bit kernel.
1547 if sys.maxsize > 0x7FFFFFFF:
1548 rawio = self.MockRawIO()
1549 bufio = self.tp(rawio)
1550 self.assertRaises((OverflowError, MemoryError, ValueError),
1551 bufio.__init__, rawio, sys.maxsize)
1552
1553 def test_garbage_collection(self):
1554 CBufferedReaderTest.test_garbage_collection(self)
1555 CBufferedWriterTest.test_garbage_collection(self)
1556
1557class PyBufferedRandomTest(BufferedRandomTest):
1558 tp = pyio.BufferedRandom
1559
1560
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001561# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1562# properties:
1563# - A single output character can correspond to many bytes of input.
1564# - The number of input bytes to complete the character can be
1565# undetermined until the last input byte is received.
1566# - The number of input bytes can vary depending on previous input.
1567# - A single input byte can correspond to many characters of output.
1568# - The number of output characters can be undetermined until the
1569# last input byte is received.
1570# - The number of output characters can vary depending on previous input.
1571
1572class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1573 """
1574 For testing seek/tell behavior with a stateful, buffering decoder.
1575
1576 Input is a sequence of words. Words may be fixed-length (length set
1577 by input) or variable-length (period-terminated). In variable-length
1578 mode, extra periods are ignored. Possible words are:
1579 - 'i' followed by a number sets the input length, I (maximum 99).
1580 When I is set to 0, words are space-terminated.
1581 - 'o' followed by a number sets the output length, O (maximum 99).
1582 - Any other word is converted into a word followed by a period on
1583 the output. The output word consists of the input word truncated
1584 or padded out with hyphens to make its length equal to O. If O
1585 is 0, the word is output verbatim without truncating or padding.
1586 I and O are initially set to 1. When I changes, any buffered input is
1587 re-scanned according to the new I. EOF also terminates the last word.
1588 """
1589
1590 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001591 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001592 self.reset()
1593
1594 def __repr__(self):
1595 return '<SID %x>' % id(self)
1596
1597 def reset(self):
1598 self.i = 1
1599 self.o = 1
1600 self.buffer = bytearray()
1601
1602 def getstate(self):
1603 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1604 return bytes(self.buffer), i*100 + o
1605
1606 def setstate(self, state):
1607 buffer, io = state
1608 self.buffer = bytearray(buffer)
1609 i, o = divmod(io, 100)
1610 self.i, self.o = i ^ 1, o ^ 1
1611
1612 def decode(self, input, final=False):
1613 output = ''
1614 for b in input:
1615 if self.i == 0: # variable-length, terminated with period
1616 if b == ord('.'):
1617 if self.buffer:
1618 output += self.process_word()
1619 else:
1620 self.buffer.append(b)
1621 else: # fixed-length, terminate after self.i bytes
1622 self.buffer.append(b)
1623 if len(self.buffer) == self.i:
1624 output += self.process_word()
1625 if final and self.buffer: # EOF terminates the last word
1626 output += self.process_word()
1627 return output
1628
1629 def process_word(self):
1630 output = ''
1631 if self.buffer[0] == ord('i'):
1632 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1633 elif self.buffer[0] == ord('o'):
1634 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1635 else:
1636 output = self.buffer.decode('ascii')
1637 if len(output) < self.o:
1638 output += '-'*self.o # pad out with hyphens
1639 if self.o:
1640 output = output[:self.o] # truncate to output length
1641 output += '.'
1642 self.buffer = bytearray()
1643 return output
1644
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001645 codecEnabled = False
1646
1647 @classmethod
1648 def lookupTestDecoder(cls, name):
1649 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001650 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001651 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001652 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001653 incrementalencoder=None,
1654 streamreader=None, streamwriter=None,
1655 incrementaldecoder=cls)
1656
1657# Register the previous decoder for testing.
1658# Disabled by default, tests will enable it.
1659codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1660
1661
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001662class StatefulIncrementalDecoderTest(unittest.TestCase):
1663 """
1664 Make sure the StatefulIncrementalDecoder actually works.
1665 """
1666
1667 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001668 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001669 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001670 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001671 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001672 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001673 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001674 # I=0, O=6 (variable-length input, fixed-length output)
1675 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1676 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001677 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001678 # I=6, O=3 (fixed-length input > fixed-length output)
1679 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1680 # I=0, then 3; O=29, then 15 (with longer output)
1681 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1682 'a----------------------------.' +
1683 'b----------------------------.' +
1684 'cde--------------------------.' +
1685 'abcdefghijabcde.' +
1686 'a.b------------.' +
1687 '.c.------------.' +
1688 'd.e------------.' +
1689 'k--------------.' +
1690 'l--------------.' +
1691 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001692 ]
1693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001695 # Try a few one-shot test cases.
1696 for input, eof, output in self.test_cases:
1697 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001698 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001699
1700 # Also test an unfinished decode, followed by forcing EOF.
1701 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001702 self.assertEqual(d.decode(b'oiabcd'), '')
1703 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001704
1705class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001706
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001707 def setUp(self):
1708 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1709 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001710 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001711
Guido van Rossumd0712812007-04-11 16:32:43 +00001712 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001713 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_constructor(self):
1716 r = self.BytesIO(b"\xc3\xa9\n\n")
1717 b = self.BufferedReader(r, 1000)
1718 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001719 t.__init__(b, encoding="latin-1", newline="\r\n")
1720 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001722 t.__init__(b, encoding="utf-8", line_buffering=True)
1723 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001724 self.assertEqual(t.line_buffering, True)
1725 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 self.assertRaises(TypeError, t.__init__, b, newline=42)
1727 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1728
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001729 def test_detach(self):
1730 r = self.BytesIO()
1731 b = self.BufferedWriter(r)
1732 t = self.TextIOWrapper(b)
1733 self.assertIs(t.detach(), b)
1734
1735 t = self.TextIOWrapper(b, encoding="ascii")
1736 t.write("howdy")
1737 self.assertFalse(r.getvalue())
1738 t.detach()
1739 self.assertEqual(r.getvalue(), b"howdy")
1740 self.assertRaises(ValueError, t.detach)
1741
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001742 def test_repr(self):
1743 raw = self.BytesIO("hello".encode("utf-8"))
1744 b = self.BufferedReader(raw)
1745 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001746 modname = self.TextIOWrapper.__module__
1747 self.assertEqual(repr(t),
1748 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1749 raw.name = "dummy"
1750 self.assertEqual(repr(t),
1751 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001752 t.mode = "r"
1753 self.assertEqual(repr(t),
1754 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001755 raw.name = b"dummy"
1756 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001757 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 def test_line_buffering(self):
1760 r = self.BytesIO()
1761 b = self.BufferedWriter(r, 1000)
1762 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001763 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001764 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001765 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001766 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001767 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001768 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001769
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 def test_encoding(self):
1771 # Check the encoding attribute is always set, and valid
1772 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001773 t = self.TextIOWrapper(b, encoding="utf-8")
1774 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001776 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 codecs.lookup(t.encoding)
1778
1779 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001780 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781 b = self.BytesIO(b"abc\n\xff\n")
1782 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001783 self.assertRaises(UnicodeError, t.read)
1784 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001785 b = self.BytesIO(b"abc\n\xff\n")
1786 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001787 self.assertRaises(UnicodeError, t.read)
1788 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001789 b = self.BytesIO(b"abc\n\xff\n")
1790 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001791 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001792 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 b = self.BytesIO(b"abc\n\xff\n")
1794 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001795 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001796
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001798 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 b = self.BytesIO()
1800 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001801 self.assertRaises(UnicodeError, t.write, "\xff")
1802 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 b = self.BytesIO()
1804 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001805 self.assertRaises(UnicodeError, t.write, "\xff")
1806 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 b = self.BytesIO()
1808 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001809 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001810 t.write("abc\xffdef\n")
1811 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001813 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001814 b = self.BytesIO()
1815 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001816 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001817 t.write("abc\xffdef\n")
1818 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001819 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001822 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1823
1824 tests = [
1825 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001826 [ '', input_lines ],
1827 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1828 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1829 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001830 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001831 encodings = (
1832 'utf-8', 'latin-1',
1833 'utf-16', 'utf-16-le', 'utf-16-be',
1834 'utf-32', 'utf-32-le', 'utf-32-be',
1835 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001836
Guido van Rossum8358db22007-08-18 21:39:55 +00001837 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001838 # character in TextIOWrapper._pending_line.
1839 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001840 # XXX: str.encode() should return bytes
1841 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001842 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001843 for bufsize in range(1, 10):
1844 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1846 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001847 encoding=encoding)
1848 if do_reads:
1849 got_lines = []
1850 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001851 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001852 if c2 == '':
1853 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001854 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001855 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001856 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001857 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001858
1859 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001860 self.assertEqual(got_line, exp_line)
1861 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001862
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 def test_newlines_input(self):
1864 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001865 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1866 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001867 (None, normalized.decode("ascii").splitlines(True)),
1868 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1870 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1871 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001872 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 buf = self.BytesIO(testdata)
1874 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001875 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001876 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001877 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_newlines_output(self):
1880 testdict = {
1881 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1882 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1883 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1884 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1885 }
1886 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1887 for newline, expected in tests:
1888 buf = self.BytesIO()
1889 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1890 txt.write("AAA\nB")
1891 txt.write("BB\nCCC\n")
1892 txt.write("X\rY\r\nZ")
1893 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001894 self.assertEqual(buf.closed, False)
1895 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896
1897 def test_destructor(self):
1898 l = []
1899 base = self.BytesIO
1900 class MyBytesIO(base):
1901 def close(self):
1902 l.append(self.getvalue())
1903 base.close(self)
1904 b = MyBytesIO()
1905 t = self.TextIOWrapper(b, encoding="ascii")
1906 t.write("abc")
1907 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001908 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001909 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910
1911 def test_override_destructor(self):
1912 record = []
1913 class MyTextIO(self.TextIOWrapper):
1914 def __del__(self):
1915 record.append(1)
1916 try:
1917 f = super().__del__
1918 except AttributeError:
1919 pass
1920 else:
1921 f()
1922 def close(self):
1923 record.append(2)
1924 super().close()
1925 def flush(self):
1926 record.append(3)
1927 super().flush()
1928 b = self.BytesIO()
1929 t = MyTextIO(b, encoding="ascii")
1930 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001931 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 self.assertEqual(record, [1, 2, 3])
1933
1934 def test_error_through_destructor(self):
1935 # Test that the exception state is not modified by a destructor,
1936 # even if close() fails.
1937 rawio = self.CloseFailureIO()
1938 def f():
1939 self.TextIOWrapper(rawio).xyzzy
1940 with support.captured_output("stderr") as s:
1941 self.assertRaises(AttributeError, f)
1942 s = s.getvalue().strip()
1943 if s:
1944 # The destructor *may* have printed an unraisable error, check it
1945 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001946 self.assertTrue(s.startswith("Exception IOError: "), s)
1947 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001948
Guido van Rossum9b76da62007-04-11 01:09:03 +00001949 # Systematic tests of the text I/O API
1950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001953 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001955 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001956 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001957 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001959 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001960 self.assertEqual(f.tell(), 0)
1961 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001962 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(f.seek(0), 0)
1964 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001965 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001966 self.assertEqual(f.read(2), "ab")
1967 self.assertEqual(f.read(1), "c")
1968 self.assertEqual(f.read(1), "")
1969 self.assertEqual(f.read(), "")
1970 self.assertEqual(f.tell(), cookie)
1971 self.assertEqual(f.seek(0), 0)
1972 self.assertEqual(f.seek(0, 2), cookie)
1973 self.assertEqual(f.write("def"), 3)
1974 self.assertEqual(f.seek(cookie), cookie)
1975 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001976 if enc.startswith("utf"):
1977 self.multi_line_test(f, enc)
1978 f.close()
1979
1980 def multi_line_test(self, f, enc):
1981 f.seek(0)
1982 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001983 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001984 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001985 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001986 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001987 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001988 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001989 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001990 wlines.append((f.tell(), line))
1991 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001992 f.seek(0)
1993 rlines = []
1994 while True:
1995 pos = f.tell()
1996 line = f.readline()
1997 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001998 break
1999 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002000 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002003 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002004 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002005 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002006 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002007 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002008 p2 = f.tell()
2009 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002010 self.assertEqual(f.tell(), p0)
2011 self.assertEqual(f.readline(), "\xff\n")
2012 self.assertEqual(f.tell(), p1)
2013 self.assertEqual(f.readline(), "\xff\n")
2014 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002015 f.seek(0)
2016 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002018 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002019 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002020 f.close()
2021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 def test_seeking(self):
2023 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002024 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002025 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002026 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002027 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002028 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002029 suffix = bytes(u_suffix.encode("utf-8"))
2030 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002031 with self.open(support.TESTFN, "wb") as f:
2032 f.write(line*2)
2033 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2034 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002035 self.assertEqual(s, str(prefix, "ascii"))
2036 self.assertEqual(f.tell(), prefix_size)
2037 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002040 # Regression test for a specific bug
2041 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002042 with self.open(support.TESTFN, "wb") as f:
2043 f.write(data)
2044 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2045 f._CHUNK_SIZE # Just test that it exists
2046 f._CHUNK_SIZE = 2
2047 f.readline()
2048 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 def test_seek_and_tell(self):
2051 #Test seek/tell using the StatefulIncrementalDecoder.
2052 # Make test faster by doing smaller seeks
2053 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002054
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002055 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002056 """Tell/seek to various points within a data stream and ensure
2057 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002059 f.write(data)
2060 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002061 f = self.open(support.TESTFN, encoding='test_decoder')
2062 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002063 decoded = f.read()
2064 f.close()
2065
Neal Norwitze2b07052008-03-18 19:52:05 +00002066 for i in range(min_pos, len(decoded) + 1): # seek positions
2067 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002068 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002069 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002070 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002071 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002072 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002073 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002074 f.close()
2075
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002076 # Enable the test decoder.
2077 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002078
2079 # Run the tests.
2080 try:
2081 # Try each test case.
2082 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002083 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002084
2085 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002086 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2087 offset = CHUNK_SIZE - len(input)//2
2088 prefix = b'.'*offset
2089 # Don't bother seeking into the prefix (takes too long).
2090 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002091 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002092
2093 # Ensure our test decoder won't interfere with subsequent tests.
2094 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002095 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002098 data = "1234567890"
2099 tests = ("utf-16",
2100 "utf-16-le",
2101 "utf-16-be",
2102 "utf-32",
2103 "utf-32-le",
2104 "utf-32-be")
2105 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002106 buf = self.BytesIO()
2107 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002108 # Check if the BOM is written only once (see issue1753).
2109 f.write(data)
2110 f.write(data)
2111 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002113 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(f.read(), data * 2)
2115 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002116
Benjamin Petersona1b49012009-03-31 23:11:32 +00002117 def test_unreadable(self):
2118 class UnReadable(self.BytesIO):
2119 def readable(self):
2120 return False
2121 txt = self.TextIOWrapper(UnReadable())
2122 self.assertRaises(IOError, txt.read)
2123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002124 def test_read_one_by_one(self):
2125 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002126 reads = ""
2127 while True:
2128 c = txt.read(1)
2129 if not c:
2130 break
2131 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002133
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002134 def test_readlines(self):
2135 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2136 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2137 txt.seek(0)
2138 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2139 txt.seek(0)
2140 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2141
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002142 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002144 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002145 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002146 reads = ""
2147 while True:
2148 c = txt.read(128)
2149 if not c:
2150 break
2151 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002153
2154 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002156
2157 # read one char at a time
2158 reads = ""
2159 while True:
2160 c = txt.read(1)
2161 if not c:
2162 break
2163 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002164 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002165
2166 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002168 txt._CHUNK_SIZE = 4
2169
2170 reads = ""
2171 while True:
2172 c = txt.read(4)
2173 if not c:
2174 break
2175 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002176 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002177
2178 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002180 txt._CHUNK_SIZE = 4
2181
2182 reads = txt.read(4)
2183 reads += txt.read(4)
2184 reads += txt.readline()
2185 reads += txt.readline()
2186 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002187 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002188
2189 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002191 txt._CHUNK_SIZE = 4
2192
2193 reads = txt.read(4)
2194 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002196
2197 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002199 txt._CHUNK_SIZE = 4
2200
2201 reads = txt.read(4)
2202 pos = txt.tell()
2203 txt.seek(0)
2204 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002206
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002207 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 buffer = self.BytesIO(self.testdata)
2209 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002210
2211 self.assertEqual(buffer.seekable(), txt.seekable())
2212
Antoine Pitroue4501852009-05-14 18:55:55 +00002213 def test_append_bom(self):
2214 # The BOM is not written again when appending to a non-empty file
2215 filename = support.TESTFN
2216 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2217 with self.open(filename, 'w', encoding=charset) as f:
2218 f.write('aaa')
2219 pos = f.tell()
2220 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002222
2223 with self.open(filename, 'a', encoding=charset) as f:
2224 f.write('xxx')
2225 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002227
2228 def test_seek_bom(self):
2229 # Same test, but when seeking manually
2230 filename = support.TESTFN
2231 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2232 with self.open(filename, 'w', encoding=charset) as f:
2233 f.write('aaa')
2234 pos = f.tell()
2235 with self.open(filename, 'r+', encoding=charset) as f:
2236 f.seek(pos)
2237 f.write('zzz')
2238 f.seek(0)
2239 f.write('bbb')
2240 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002242
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002243 def test_errors_property(self):
2244 with self.open(support.TESTFN, "w") as f:
2245 self.assertEqual(f.errors, "strict")
2246 with self.open(support.TESTFN, "w", errors="replace") as f:
2247 self.assertEqual(f.errors, "replace")
2248
Brett Cannon31f59292011-02-21 19:29:56 +00002249 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002250 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002251 def test_threads_write(self):
2252 # Issue6750: concurrent writes could duplicate data
2253 event = threading.Event()
2254 with self.open(support.TESTFN, "w", buffering=1) as f:
2255 def run(n):
2256 text = "Thread%03d\n" % n
2257 event.wait()
2258 f.write(text)
2259 threads = [threading.Thread(target=lambda n=x: run(n))
2260 for x in range(20)]
2261 for t in threads:
2262 t.start()
2263 time.sleep(0.02)
2264 event.set()
2265 for t in threads:
2266 t.join()
2267 with self.open(support.TESTFN) as f:
2268 content = f.read()
2269 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002270 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002271
Antoine Pitrou6be88762010-05-03 16:48:20 +00002272 def test_flush_error_on_close(self):
2273 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2274 def bad_flush():
2275 raise IOError()
2276 txt.flush = bad_flush
2277 self.assertRaises(IOError, txt.close) # exception not swallowed
2278
2279 def test_multi_close(self):
2280 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2281 txt.close()
2282 txt.close()
2283 txt.close()
2284 self.assertRaises(ValueError, txt.flush)
2285
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002286 def test_unseekable(self):
2287 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2288 self.assertRaises(self.UnsupportedOperation, txt.tell)
2289 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2290
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002291 def test_readonly_attributes(self):
2292 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2293 buf = self.BytesIO(self.testdata)
2294 with self.assertRaises(AttributeError):
2295 txt.buffer = buf
2296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297class CTextIOWrapperTest(TextIOWrapperTest):
2298
2299 def test_initialization(self):
2300 r = self.BytesIO(b"\xc3\xa9\n\n")
2301 b = self.BufferedReader(r, 1000)
2302 t = self.TextIOWrapper(b)
2303 self.assertRaises(TypeError, t.__init__, b, newline=42)
2304 self.assertRaises(ValueError, t.read)
2305 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2306 self.assertRaises(ValueError, t.read)
2307
2308 def test_garbage_collection(self):
2309 # C TextIOWrapper objects are collected, and collecting them flushes
2310 # all data to disk.
2311 # The Python version has __del__, so it ends in gc.garbage instead.
2312 rawio = io.FileIO(support.TESTFN, "wb")
2313 b = self.BufferedWriter(rawio)
2314 t = self.TextIOWrapper(b, encoding="ascii")
2315 t.write("456def")
2316 t.x = t
2317 wr = weakref.ref(t)
2318 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002319 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002320 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002321 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322 self.assertEqual(f.read(), b"456def")
2323
2324class PyTextIOWrapperTest(TextIOWrapperTest):
2325 pass
2326
2327
2328class IncrementalNewlineDecoderTest(unittest.TestCase):
2329
2330 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002331 # UTF-8 specific tests for a newline decoder
2332 def _check_decode(b, s, **kwargs):
2333 # We exercise getstate() / setstate() as well as decode()
2334 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002335 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002336 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002338
Antoine Pitrou180a3362008-12-14 16:36:46 +00002339 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002340
Antoine Pitrou180a3362008-12-14 16:36:46 +00002341 _check_decode(b'\xe8', "")
2342 _check_decode(b'\xa2', "")
2343 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002344
Antoine Pitrou180a3362008-12-14 16:36:46 +00002345 _check_decode(b'\xe8', "")
2346 _check_decode(b'\xa2', "")
2347 _check_decode(b'\x88', "\u8888")
2348
2349 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2351
Antoine Pitrou180a3362008-12-14 16:36:46 +00002352 decoder.reset()
2353 _check_decode(b'\n', "\n")
2354 _check_decode(b'\r', "")
2355 _check_decode(b'', "\n", final=True)
2356 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002357
Antoine Pitrou180a3362008-12-14 16:36:46 +00002358 _check_decode(b'\r', "")
2359 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002360
Antoine Pitrou180a3362008-12-14 16:36:46 +00002361 _check_decode(b'\r\r\n', "\n\n")
2362 _check_decode(b'\r', "")
2363 _check_decode(b'\r', "\n")
2364 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002365
Antoine Pitrou180a3362008-12-14 16:36:46 +00002366 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2367 _check_decode(b'\xe8\xa2\x88', "\u8888")
2368 _check_decode(b'\n', "\n")
2369 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2370 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002373 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 if encoding is not None:
2375 encoder = codecs.getincrementalencoder(encoding)()
2376 def _decode_bytewise(s):
2377 # Decode one byte at a time
2378 for b in encoder.encode(s):
2379 result.append(decoder.decode(bytes([b])))
2380 else:
2381 encoder = None
2382 def _decode_bytewise(s):
2383 # Decode one char at a time
2384 for c in s:
2385 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002387 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002388 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002389 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002391 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002393 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002394 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002395 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002396 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002397 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398 input = "abc"
2399 if encoder is not None:
2400 encoder.reset()
2401 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002402 self.assertEqual(decoder.decode(input), "abc")
2403 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002404
2405 def test_newline_decoder(self):
2406 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 # None meaning the IncrementalNewlineDecoder takes unicode input
2408 # rather than bytes input
2409 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002410 'utf-16', 'utf-16-le', 'utf-16-be',
2411 'utf-32', 'utf-32-le', 'utf-32-be',
2412 )
2413 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002414 decoder = enc and codecs.getincrementaldecoder(enc)()
2415 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2416 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002417 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002418 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2419 self.check_newline_decoding_utf8(decoder)
2420
Antoine Pitrou66913e22009-03-06 23:40:56 +00002421 def test_newline_bytes(self):
2422 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2423 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002424 self.assertEqual(dec.newlines, None)
2425 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2426 self.assertEqual(dec.newlines, None)
2427 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2428 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002429 dec = self.IncrementalNewlineDecoder(None, translate=False)
2430 _check(dec)
2431 dec = self.IncrementalNewlineDecoder(None, translate=True)
2432 _check(dec)
2433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2435 pass
2436
2437class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2438 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002439
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002440
Guido van Rossum01a27522007-03-07 01:00:12 +00002441# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002442
Guido van Rossum5abbf752007-08-27 17:39:33 +00002443class MiscIOTest(unittest.TestCase):
2444
Barry Warsaw40e82462008-11-20 20:14:50 +00002445 def tearDown(self):
2446 support.unlink(support.TESTFN)
2447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 def test___all__(self):
2449 for name in self.io.__all__:
2450 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002451 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002452 if name == "open":
2453 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002454 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002455 self.assertTrue(issubclass(obj, Exception), name)
2456 elif not name.startswith("SEEK_"):
2457 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002458
Barry Warsaw40e82462008-11-20 20:14:50 +00002459 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002462 f.close()
2463
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002464 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(f.name, support.TESTFN)
2466 self.assertEqual(f.buffer.name, support.TESTFN)
2467 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2468 self.assertEqual(f.mode, "U")
2469 self.assertEqual(f.buffer.mode, "rb")
2470 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002471 f.close()
2472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002473 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002474 self.assertEqual(f.mode, "w+")
2475 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2476 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002477
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002479 self.assertEqual(g.mode, "wb")
2480 self.assertEqual(g.raw.mode, "wb")
2481 self.assertEqual(g.name, f.fileno())
2482 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002483 f.close()
2484 g.close()
2485
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002486 def test_io_after_close(self):
2487 for kwargs in [
2488 {"mode": "w"},
2489 {"mode": "wb"},
2490 {"mode": "w", "buffering": 1},
2491 {"mode": "w", "buffering": 2},
2492 {"mode": "wb", "buffering": 0},
2493 {"mode": "r"},
2494 {"mode": "rb"},
2495 {"mode": "r", "buffering": 1},
2496 {"mode": "r", "buffering": 2},
2497 {"mode": "rb", "buffering": 0},
2498 {"mode": "w+"},
2499 {"mode": "w+b"},
2500 {"mode": "w+", "buffering": 1},
2501 {"mode": "w+", "buffering": 2},
2502 {"mode": "w+b", "buffering": 0},
2503 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002505 f.close()
2506 self.assertRaises(ValueError, f.flush)
2507 self.assertRaises(ValueError, f.fileno)
2508 self.assertRaises(ValueError, f.isatty)
2509 self.assertRaises(ValueError, f.__iter__)
2510 if hasattr(f, "peek"):
2511 self.assertRaises(ValueError, f.peek, 1)
2512 self.assertRaises(ValueError, f.read)
2513 if hasattr(f, "read1"):
2514 self.assertRaises(ValueError, f.read1, 1024)
2515 if hasattr(f, "readinto"):
2516 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2517 self.assertRaises(ValueError, f.readline)
2518 self.assertRaises(ValueError, f.readlines)
2519 self.assertRaises(ValueError, f.seek, 0)
2520 self.assertRaises(ValueError, f.tell)
2521 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 self.assertRaises(ValueError, f.write,
2523 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002524 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002525 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002527 def test_blockingioerror(self):
2528 # Various BlockingIOError issues
2529 self.assertRaises(TypeError, self.BlockingIOError)
2530 self.assertRaises(TypeError, self.BlockingIOError, 1)
2531 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2532 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2533 b = self.BlockingIOError(1, "")
2534 self.assertEqual(b.characters_written, 0)
2535 class C(str):
2536 pass
2537 c = C("")
2538 b = self.BlockingIOError(1, c)
2539 c.b = b
2540 b.c = c
2541 wr = weakref.ref(c)
2542 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002543 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002544 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545
2546 def test_abcs(self):
2547 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002548 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2549 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2550 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2551 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552
2553 def _check_abc_inheritance(self, abcmodule):
2554 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002555 self.assertIsInstance(f, abcmodule.IOBase)
2556 self.assertIsInstance(f, abcmodule.RawIOBase)
2557 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2558 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002560 self.assertIsInstance(f, abcmodule.IOBase)
2561 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2562 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2563 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002565 self.assertIsInstance(f, abcmodule.IOBase)
2566 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2567 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2568 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569
2570 def test_abc_inheritance(self):
2571 # Test implementations inherit from their respective ABCs
2572 self._check_abc_inheritance(self)
2573
2574 def test_abc_inheritance_official(self):
2575 # Test implementations inherit from the official ABCs of the
2576 # baseline "io" module.
2577 self._check_abc_inheritance(io)
2578
Antoine Pitroue033e062010-10-29 10:38:18 +00002579 def _check_warn_on_dealloc(self, *args, **kwargs):
2580 f = open(*args, **kwargs)
2581 r = repr(f)
2582 with self.assertWarns(ResourceWarning) as cm:
2583 f = None
2584 support.gc_collect()
2585 self.assertIn(r, str(cm.warning.args[0]))
2586
2587 def test_warn_on_dealloc(self):
2588 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2589 self._check_warn_on_dealloc(support.TESTFN, "wb")
2590 self._check_warn_on_dealloc(support.TESTFN, "w")
2591
2592 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2593 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002594 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002595 for fd in fds:
2596 try:
2597 os.close(fd)
2598 except EnvironmentError as e:
2599 if e.errno != errno.EBADF:
2600 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002601 self.addCleanup(cleanup_fds)
2602 r, w = os.pipe()
2603 fds += r, w
2604 self._check_warn_on_dealloc(r, *args, **kwargs)
2605 # When using closefd=False, there's no warning
2606 r, w = os.pipe()
2607 fds += r, w
2608 with warnings.catch_warnings(record=True) as recorded:
2609 open(r, *args, closefd=False, **kwargs)
2610 support.gc_collect()
2611 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002612
2613 def test_warn_on_dealloc_fd(self):
2614 self._check_warn_on_dealloc_fd("rb", buffering=0)
2615 self._check_warn_on_dealloc_fd("rb")
2616 self._check_warn_on_dealloc_fd("r")
2617
2618
Antoine Pitrou243757e2010-11-05 21:15:39 +00002619 def test_pickling(self):
2620 # Pickling file objects is forbidden
2621 for kwargs in [
2622 {"mode": "w"},
2623 {"mode": "wb"},
2624 {"mode": "wb", "buffering": 0},
2625 {"mode": "r"},
2626 {"mode": "rb"},
2627 {"mode": "rb", "buffering": 0},
2628 {"mode": "w+"},
2629 {"mode": "w+b"},
2630 {"mode": "w+b", "buffering": 0},
2631 ]:
2632 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2633 with self.open(support.TESTFN, **kwargs) as f:
2634 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636class CMiscIOTest(MiscIOTest):
2637 io = io
2638
2639class PyMiscIOTest(MiscIOTest):
2640 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002641
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002642
2643@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2644class SignalsTest(unittest.TestCase):
2645
2646 def setUp(self):
2647 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2648
2649 def tearDown(self):
2650 signal.signal(signal.SIGALRM, self.oldalrm)
2651
2652 def alarm_interrupt(self, sig, frame):
2653 1/0
2654
2655 @unittest.skipUnless(threading, 'Threading required for this test.')
2656 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2657 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002658 invokes the signal handler, and bubbles up the exception raised
2659 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002660 read_results = []
2661 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002662 if hasattr(signal, 'pthread_sigmask'):
2663 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002664 s = os.read(r, 1)
2665 read_results.append(s)
2666 t = threading.Thread(target=_read)
2667 t.daemon = True
2668 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002669 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002670 try:
2671 wio = self.io.open(w, **fdopen_kwargs)
2672 t.start()
2673 signal.alarm(1)
2674 # Fill the pipe enough that the write will be blocking.
2675 # It will be interrupted by the timer armed above. Since the
2676 # other thread has read one byte, the low-level write will
2677 # return with a successful (partial) result rather than an EINTR.
2678 # The buffered IO layer must check for pending signal
2679 # handlers, which in this case will invoke alarm_interrupt().
2680 self.assertRaises(ZeroDivisionError,
2681 wio.write, item * (1024 * 1024))
2682 t.join()
2683 # We got one byte, get another one and check that it isn't a
2684 # repeat of the first one.
2685 read_results.append(os.read(r, 1))
2686 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2687 finally:
2688 os.close(w)
2689 os.close(r)
2690 # This is deliberate. If we didn't close the file descriptor
2691 # before closing wio, wio would try to flush its internal
2692 # buffer, and block again.
2693 try:
2694 wio.close()
2695 except IOError as e:
2696 if e.errno != errno.EBADF:
2697 raise
2698
2699 def test_interrupted_write_unbuffered(self):
2700 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2701
2702 def test_interrupted_write_buffered(self):
2703 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2704
2705 def test_interrupted_write_text(self):
2706 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2707
Brett Cannon31f59292011-02-21 19:29:56 +00002708 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002709 def check_reentrant_write(self, data, **fdopen_kwargs):
2710 def on_alarm(*args):
2711 # Will be called reentrantly from the same thread
2712 wio.write(data)
2713 1/0
2714 signal.signal(signal.SIGALRM, on_alarm)
2715 r, w = os.pipe()
2716 wio = self.io.open(w, **fdopen_kwargs)
2717 try:
2718 signal.alarm(1)
2719 # Either the reentrant call to wio.write() fails with RuntimeError,
2720 # or the signal handler raises ZeroDivisionError.
2721 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2722 while 1:
2723 for i in range(100):
2724 wio.write(data)
2725 wio.flush()
2726 # Make sure the buffer doesn't fill up and block further writes
2727 os.read(r, len(data) * 100)
2728 exc = cm.exception
2729 if isinstance(exc, RuntimeError):
2730 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2731 finally:
2732 wio.close()
2733 os.close(r)
2734
2735 def test_reentrant_write_buffered(self):
2736 self.check_reentrant_write(b"xy", mode="wb")
2737
2738 def test_reentrant_write_text(self):
2739 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2740
Antoine Pitrou707ce822011-02-25 21:24:11 +00002741 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2742 """Check that a buffered read, when it gets interrupted (either
2743 returning a partial result or EINTR), properly invokes the signal
2744 handler and retries if the latter returned successfully."""
2745 r, w = os.pipe()
2746 fdopen_kwargs["closefd"] = False
2747 def alarm_handler(sig, frame):
2748 os.write(w, b"bar")
2749 signal.signal(signal.SIGALRM, alarm_handler)
2750 try:
2751 rio = self.io.open(r, **fdopen_kwargs)
2752 os.write(w, b"foo")
2753 signal.alarm(1)
2754 # Expected behaviour:
2755 # - first raw read() returns partial b"foo"
2756 # - second raw read() returns EINTR
2757 # - third raw read() returns b"bar"
2758 self.assertEqual(decode(rio.read(6)), "foobar")
2759 finally:
2760 rio.close()
2761 os.close(w)
2762 os.close(r)
2763
2764 def test_interrupterd_read_retry_buffered(self):
2765 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2766 mode="rb")
2767
2768 def test_interrupterd_read_retry_text(self):
2769 self.check_interrupted_read_retry(lambda x: x,
2770 mode="r")
2771
2772 @unittest.skipUnless(threading, 'Threading required for this test.')
2773 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2774 """Check that a buffered write, when it gets interrupted (either
2775 returning a partial result or EINTR), properly invokes the signal
2776 handler and retries if the latter returned successfully."""
2777 select = support.import_module("select")
2778 # A quantity that exceeds the buffer size of an anonymous pipe's
2779 # write end.
2780 N = 1024 * 1024
2781 r, w = os.pipe()
2782 fdopen_kwargs["closefd"] = False
2783 # We need a separate thread to read from the pipe and allow the
2784 # write() to finish. This thread is started after the SIGALRM is
2785 # received (forcing a first EINTR in write()).
2786 read_results = []
2787 write_finished = False
2788 def _read():
2789 while not write_finished:
2790 while r in select.select([r], [], [], 1.0)[0]:
2791 s = os.read(r, 1024)
2792 read_results.append(s)
2793 t = threading.Thread(target=_read)
2794 t.daemon = True
2795 def alarm1(sig, frame):
2796 signal.signal(signal.SIGALRM, alarm2)
2797 signal.alarm(1)
2798 def alarm2(sig, frame):
2799 t.start()
2800 signal.signal(signal.SIGALRM, alarm1)
2801 try:
2802 wio = self.io.open(w, **fdopen_kwargs)
2803 signal.alarm(1)
2804 # Expected behaviour:
2805 # - first raw write() is partial (because of the limited pipe buffer
2806 # and the first alarm)
2807 # - second raw write() returns EINTR (because of the second alarm)
2808 # - subsequent write()s are successful (either partial or complete)
2809 self.assertEqual(N, wio.write(item * N))
2810 wio.flush()
2811 write_finished = True
2812 t.join()
2813 self.assertEqual(N, sum(len(x) for x in read_results))
2814 finally:
2815 write_finished = True
2816 os.close(w)
2817 os.close(r)
2818 # This is deliberate. If we didn't close the file descriptor
2819 # before closing wio, wio would try to flush its internal
2820 # buffer, and could block (in case of failure).
2821 try:
2822 wio.close()
2823 except IOError as e:
2824 if e.errno != errno.EBADF:
2825 raise
2826
2827 def test_interrupterd_write_retry_buffered(self):
2828 self.check_interrupted_write_retry(b"x", mode="wb")
2829
2830 def test_interrupterd_write_retry_text(self):
2831 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2832
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002833
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002834class CSignalsTest(SignalsTest):
2835 io = io
2836
2837class PySignalsTest(SignalsTest):
2838 io = pyio
2839
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002840 # Handling reentrancy issues would slow down _pyio even more, so the
2841 # tests are disabled.
2842 test_reentrant_write_buffered = None
2843 test_reentrant_write_text = None
2844
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002845
Guido van Rossum28524c72007-02-27 05:47:44 +00002846def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 tests = (CIOTest, PyIOTest,
2848 CBufferedReaderTest, PyBufferedReaderTest,
2849 CBufferedWriterTest, PyBufferedWriterTest,
2850 CBufferedRWPairTest, PyBufferedRWPairTest,
2851 CBufferedRandomTest, PyBufferedRandomTest,
2852 StatefulIncrementalDecoderTest,
2853 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2854 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002855 CMiscIOTest, PyMiscIOTest,
2856 CSignalsTest, PySignalsTest,
2857 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002858
2859 # Put the namespaces of the IO module we are testing and some useful mock
2860 # classes in the __dict__ of each test.
2861 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002862 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002863 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2864 c_io_ns = {name : getattr(io, name) for name in all_members}
2865 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2866 globs = globals()
2867 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2868 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2869 # Avoid turning open into a bound method.
2870 py_io_ns["open"] = pyio.OpenWrapper
2871 for test in tests:
2872 if test.__name__.startswith("C"):
2873 for name, obj in c_io_ns.items():
2874 setattr(test, name, obj)
2875 elif test.__name__.startswith("Py"):
2876 for name, obj in py_io_ns.items():
2877 setattr(test, name, obj)
2878
2879 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002880
2881if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 test_main()