blob: bcf435fa1fe2ede627adc57ee3e064dd8671bafb [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
614 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class PyIOTest(IOTest):
617 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000618
Guido van Rossuma9e20242007-03-08 00:43:48 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620class CommonBufferedTests:
621 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
622
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000623 def test_detach(self):
624 raw = self.MockRawIO()
625 buf = self.tp(raw)
626 self.assertIs(buf.detach(), raw)
627 self.assertRaises(ValueError, buf.detach)
628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 def test_fileno(self):
630 rawio = self.MockRawIO()
631 bufio = self.tp(rawio)
632
Ezio Melottib3aedd42010-11-20 19:04:17 +0000633 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def test_no_fileno(self):
636 # XXX will we always have fileno() function? If so, kill
637 # this test. Else, write it.
638 pass
639
640 def test_invalid_args(self):
641 rawio = self.MockRawIO()
642 bufio = self.tp(rawio)
643 # Invalid whence
644 self.assertRaises(ValueError, bufio.seek, 0, -1)
645 self.assertRaises(ValueError, bufio.seek, 0, 3)
646
647 def test_override_destructor(self):
648 tp = self.tp
649 record = []
650 class MyBufferedIO(tp):
651 def __del__(self):
652 record.append(1)
653 try:
654 f = super().__del__
655 except AttributeError:
656 pass
657 else:
658 f()
659 def close(self):
660 record.append(2)
661 super().close()
662 def flush(self):
663 record.append(3)
664 super().flush()
665 rawio = self.MockRawIO()
666 bufio = MyBufferedIO(rawio)
667 writable = bufio.writable()
668 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 if writable:
671 self.assertEqual(record, [1, 2, 3])
672 else:
673 self.assertEqual(record, [1, 2])
674
675 def test_context_manager(self):
676 # Test usability as a context manager
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 def _with():
680 with bufio:
681 pass
682 _with()
683 # bufio should now be closed, and using it a second time should raise
684 # a ValueError.
685 self.assertRaises(ValueError, _with)
686
687 def test_error_through_destructor(self):
688 # Test that the exception state is not modified by a destructor,
689 # even if close() fails.
690 rawio = self.CloseFailureIO()
691 def f():
692 self.tp(rawio).xyzzy
693 with support.captured_output("stderr") as s:
694 self.assertRaises(AttributeError, f)
695 s = s.getvalue().strip()
696 if s:
697 # The destructor *may* have printed an unraisable error, check it
698 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000699 self.assertTrue(s.startswith("Exception IOError: "), s)
700 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000701
Antoine Pitrou716c4442009-05-23 19:04:03 +0000702 def test_repr(self):
703 raw = self.MockRawIO()
704 b = self.tp(raw)
705 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
706 self.assertEqual(repr(b), "<%s>" % clsname)
707 raw.name = "dummy"
708 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
709 raw.name = b"dummy"
710 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
711
Antoine Pitrou6be88762010-05-03 16:48:20 +0000712 def test_flush_error_on_close(self):
713 raw = self.MockRawIO()
714 def bad_flush():
715 raise IOError()
716 raw.flush = bad_flush
717 b = self.tp(raw)
718 self.assertRaises(IOError, b.close) # exception not swallowed
719
720 def test_multi_close(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 b.close()
724 b.close()
725 b.close()
726 self.assertRaises(ValueError, b.flush)
727
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000728 def test_unseekable(self):
729 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
730 self.assertRaises(self.UnsupportedOperation, bufio.tell)
731 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
732
Guido van Rossum78892e42007-04-06 17:31:18 +0000733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
735 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_constructor(self):
738 rawio = self.MockRawIO([b"abc"])
739 bufio = self.tp(rawio)
740 bufio.__init__(rawio)
741 bufio.__init__(rawio, buffer_size=1024)
742 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000743 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
745 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
746 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
747 rawio = self.MockRawIO([b"abc"])
748 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000749 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000752 for arg in (None, 7):
753 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
754 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000755 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 # Invalid args
757 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 def test_read1(self):
760 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
761 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000762 self.assertEqual(b"a", bufio.read(1))
763 self.assertEqual(b"b", bufio.read1(1))
764 self.assertEqual(rawio._reads, 1)
765 self.assertEqual(b"c", bufio.read1(100))
766 self.assertEqual(rawio._reads, 1)
767 self.assertEqual(b"d", bufio.read1(100))
768 self.assertEqual(rawio._reads, 2)
769 self.assertEqual(b"efg", bufio.read1(100))
770 self.assertEqual(rawio._reads, 3)
771 self.assertEqual(b"", bufio.read1(100))
772 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 # Invalid args
774 self.assertRaises(ValueError, bufio.read1, -1)
775
776 def test_readinto(self):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
779 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000780 self.assertEqual(bufio.readinto(b), 2)
781 self.assertEqual(b, b"ab")
782 self.assertEqual(bufio.readinto(b), 2)
783 self.assertEqual(b, b"cd")
784 self.assertEqual(bufio.readinto(b), 2)
785 self.assertEqual(b, b"ef")
786 self.assertEqual(bufio.readinto(b), 1)
787 self.assertEqual(b, b"gf")
788 self.assertEqual(bufio.readinto(b), 0)
789 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000790
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000791 def test_readlines(self):
792 def bufio():
793 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
794 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000795 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
796 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
797 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000800 data = b"abcdefghi"
801 dlen = len(data)
802
803 tests = [
804 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
805 [ 100, [ 3, 3, 3], [ dlen ] ],
806 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
807 ]
808
809 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000810 rawio = self.MockFileIO(data)
811 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000812 pos = 0
813 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000814 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000815 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000817 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000820 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000821 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
822 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000823
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(b"abcd", bufio.read(6))
825 self.assertEqual(b"e", bufio.read(1))
826 self.assertEqual(b"fg", bufio.read())
827 self.assertEqual(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000828 self.assertTrue(None is bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000829 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000831 def test_read_past_eof(self):
832 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
833 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000834
Ezio Melottib3aedd42010-11-20 19:04:17 +0000835 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837 def test_read_all(self):
838 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
839 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000840
Ezio Melottib3aedd42010-11-20 19:04:17 +0000841 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000842
Victor Stinner45df8202010-04-28 22:31:17 +0000843 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000844 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000846 try:
847 # Write out many bytes with exactly the same number of 0's,
848 # 1's... 255's. This will help us check that concurrent reading
849 # doesn't duplicate or forget contents.
850 N = 1000
851 l = list(range(256)) * N
852 random.shuffle(l)
853 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000854 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000855 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000856 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000857 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000858 errors = []
859 results = []
860 def f():
861 try:
862 # Intra-buffer read then buffer-flushing read
863 for n in cycle([1, 19]):
864 s = bufio.read(n)
865 if not s:
866 break
867 # list.append() is atomic
868 results.append(s)
869 except Exception as e:
870 errors.append(e)
871 raise
872 threads = [threading.Thread(target=f) for x in range(20)]
873 for t in threads:
874 t.start()
875 time.sleep(0.02) # yield
876 for t in threads:
877 t.join()
878 self.assertFalse(errors,
879 "the following exceptions were caught: %r" % errors)
880 s = b''.join(results)
881 for i in range(256):
882 c = bytes(bytearray([i]))
883 self.assertEqual(s.count(c), N)
884 finally:
885 support.unlink(support.TESTFN)
886
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000887 def test_misbehaved_io(self):
888 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
889 bufio = self.tp(rawio)
890 self.assertRaises(IOError, bufio.seek, 0)
891 self.assertRaises(IOError, bufio.tell)
892
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000893 def test_no_extraneous_read(self):
894 # Issue #9550; when the raw IO object has satisfied the read request,
895 # we should not issue any additional reads, otherwise it may block
896 # (e.g. socket).
897 bufsize = 16
898 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
899 rawio = self.MockRawIO([b"x" * n])
900 bufio = self.tp(rawio, bufsize)
901 self.assertEqual(bufio.read(n), b"x" * n)
902 # Simple case: one raw read is enough to satisfy the request.
903 self.assertEqual(rawio._extraneous_reads, 0,
904 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
905 # A more complex case where two raw reads are needed to satisfy
906 # the request.
907 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
908 bufio = self.tp(rawio, bufsize)
909 self.assertEqual(bufio.read(n), b"x" * n)
910 self.assertEqual(rawio._extraneous_reads, 0,
911 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
912
913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000914class CBufferedReaderTest(BufferedReaderTest):
915 tp = io.BufferedReader
916
917 def test_constructor(self):
918 BufferedReaderTest.test_constructor(self)
919 # The allocation can succeed on 32-bit builds, e.g. with more
920 # than 2GB RAM and a 64-bit kernel.
921 if sys.maxsize > 0x7FFFFFFF:
922 rawio = self.MockRawIO()
923 bufio = self.tp(rawio)
924 self.assertRaises((OverflowError, MemoryError, ValueError),
925 bufio.__init__, rawio, sys.maxsize)
926
927 def test_initialization(self):
928 rawio = self.MockRawIO([b"abc"])
929 bufio = self.tp(rawio)
930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
931 self.assertRaises(ValueError, bufio.read)
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
933 self.assertRaises(ValueError, bufio.read)
934 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
935 self.assertRaises(ValueError, bufio.read)
936
937 def test_misbehaved_io_read(self):
938 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
939 bufio = self.tp(rawio)
940 # _pyio.BufferedReader seems to implement reading different, so that
941 # checking this is not so easy.
942 self.assertRaises(IOError, bufio.read, 10)
943
944 def test_garbage_collection(self):
945 # C BufferedReader objects are collected.
946 # The Python version has __del__, so it ends into gc.garbage instead
947 rawio = self.FileIO(support.TESTFN, "w+b")
948 f = self.tp(rawio)
949 f.f = f
950 wr = weakref.ref(f)
951 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000952 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000953 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954
955class PyBufferedReaderTest(BufferedReaderTest):
956 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000957
Guido van Rossuma9e20242007-03-08 00:43:48 +0000958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
960 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_constructor(self):
963 rawio = self.MockRawIO()
964 bufio = self.tp(rawio)
965 bufio.__init__(rawio)
966 bufio.__init__(rawio, buffer_size=1024)
967 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000968 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 bufio.flush()
970 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
971 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
972 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
973 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000974 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000976 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000978 def test_detach_flush(self):
979 raw = self.MockRawIO()
980 buf = self.tp(raw)
981 buf.write(b"howdy!")
982 self.assertFalse(raw._write_stack)
983 buf.detach()
984 self.assertEqual(raw._write_stack, [b"howdy!"])
985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000987 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 writer = self.MockRawIO()
989 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000990 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000991 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 def test_write_overflow(self):
994 writer = self.MockRawIO()
995 bufio = self.tp(writer, 8)
996 contents = b"abcdefghijklmnop"
997 for n in range(0, len(contents), 3):
998 bufio.write(contents[n:n+3])
999 flushed = b"".join(writer._write_stack)
1000 # At least (total - 8) bytes were implicitly flushed, perhaps more
1001 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001002 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001004 def check_writes(self, intermediate_func):
1005 # Lots of writes, test the flushed output is as expected.
1006 contents = bytes(range(256)) * 1000
1007 n = 0
1008 writer = self.MockRawIO()
1009 bufio = self.tp(writer, 13)
1010 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1011 def gen_sizes():
1012 for size in count(1):
1013 for i in range(15):
1014 yield size
1015 sizes = gen_sizes()
1016 while n < len(contents):
1017 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001018 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 intermediate_func(bufio)
1020 n += size
1021 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001022 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_writes(self):
1025 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027 def test_writes_and_flushes(self):
1028 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 def test_writes_and_seeks(self):
1031 def _seekabs(bufio):
1032 pos = bufio.tell()
1033 bufio.seek(pos + 1, 0)
1034 bufio.seek(pos - 1, 0)
1035 bufio.seek(pos, 0)
1036 self.check_writes(_seekabs)
1037 def _seekrel(bufio):
1038 pos = bufio.seek(0, 1)
1039 bufio.seek(+1, 1)
1040 bufio.seek(-1, 1)
1041 bufio.seek(pos, 0)
1042 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044 def test_writes_and_truncates(self):
1045 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 def test_write_non_blocking(self):
1048 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001049 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001050
Ezio Melottib3aedd42010-11-20 19:04:17 +00001051 self.assertEqual(bufio.write(b"abcd"), 4)
1052 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053 # 1 byte will be written, the rest will be buffered
1054 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001055 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1058 raw.block_on(b"0")
1059 try:
1060 bufio.write(b"opqrwxyz0123456789")
1061 except self.BlockingIOError as e:
1062 written = e.characters_written
1063 else:
1064 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001065 self.assertEqual(written, 16)
1066 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001068
Ezio Melottib3aedd42010-11-20 19:04:17 +00001069 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 s = raw.pop_written()
1071 # Previously buffered bytes were flushed
1072 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 def test_write_and_rewind(self):
1075 raw = io.BytesIO()
1076 bufio = self.tp(raw, 4)
1077 self.assertEqual(bufio.write(b"abcdef"), 6)
1078 self.assertEqual(bufio.tell(), 6)
1079 bufio.seek(0, 0)
1080 self.assertEqual(bufio.write(b"XY"), 2)
1081 bufio.seek(6, 0)
1082 self.assertEqual(raw.getvalue(), b"XYcdef")
1083 self.assertEqual(bufio.write(b"123456"), 6)
1084 bufio.flush()
1085 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087 def test_flush(self):
1088 writer = self.MockRawIO()
1089 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001090 bufio.write(b"abc")
1091 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001092 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 def test_destructor(self):
1095 writer = self.MockRawIO()
1096 bufio = self.tp(writer, 8)
1097 bufio.write(b"abc")
1098 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001099 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001100 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101
1102 def test_truncate(self):
1103 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001104 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 bufio = self.tp(raw, 8)
1106 bufio.write(b"abcdef")
1107 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001108 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001109 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 self.assertEqual(f.read(), b"abc")
1111
Victor Stinner45df8202010-04-28 22:31:17 +00001112 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001113 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001115 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 # Write out many bytes from many threads and test they were
1117 # all flushed.
1118 N = 1000
1119 contents = bytes(range(256)) * N
1120 sizes = cycle([1, 19])
1121 n = 0
1122 queue = deque()
1123 while n < len(contents):
1124 size = next(sizes)
1125 queue.append(contents[n:n+size])
1126 n += size
1127 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001128 # We use a real file object because it allows us to
1129 # exercise situations where the GIL is released before
1130 # writing the buffer to the raw streams. This is in addition
1131 # to concurrency issues due to switching threads in the middle
1132 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001133 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001135 errors = []
1136 def f():
1137 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 while True:
1139 try:
1140 s = queue.popleft()
1141 except IndexError:
1142 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001143 bufio.write(s)
1144 except Exception as e:
1145 errors.append(e)
1146 raise
1147 threads = [threading.Thread(target=f) for x in range(20)]
1148 for t in threads:
1149 t.start()
1150 time.sleep(0.02) # yield
1151 for t in threads:
1152 t.join()
1153 self.assertFalse(errors,
1154 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001156 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 s = f.read()
1158 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001159 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001160 finally:
1161 support.unlink(support.TESTFN)
1162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163 def test_misbehaved_io(self):
1164 rawio = self.MisbehavedRawIO()
1165 bufio = self.tp(rawio, 5)
1166 self.assertRaises(IOError, bufio.seek, 0)
1167 self.assertRaises(IOError, bufio.tell)
1168 self.assertRaises(IOError, bufio.write, b"abcdef")
1169
Benjamin Peterson59406a92009-03-26 17:10:29 +00001170 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001171 with support.check_warnings(("max_buffer_size is deprecated",
1172 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001173 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001174
1175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176class CBufferedWriterTest(BufferedWriterTest):
1177 tp = io.BufferedWriter
1178
1179 def test_constructor(self):
1180 BufferedWriterTest.test_constructor(self)
1181 # The allocation can succeed on 32-bit builds, e.g. with more
1182 # than 2GB RAM and a 64-bit kernel.
1183 if sys.maxsize > 0x7FFFFFFF:
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio)
1186 self.assertRaises((OverflowError, MemoryError, ValueError),
1187 bufio.__init__, rawio, sys.maxsize)
1188
1189 def test_initialization(self):
1190 rawio = self.MockRawIO()
1191 bufio = self.tp(rawio)
1192 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1193 self.assertRaises(ValueError, bufio.write, b"def")
1194 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1195 self.assertRaises(ValueError, bufio.write, b"def")
1196 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1197 self.assertRaises(ValueError, bufio.write, b"def")
1198
1199 def test_garbage_collection(self):
1200 # C BufferedWriter objects are collected, and collecting them flushes
1201 # all data to disk.
1202 # The Python version has __del__, so it ends into gc.garbage instead
1203 rawio = self.FileIO(support.TESTFN, "w+b")
1204 f = self.tp(rawio)
1205 f.write(b"123xxx")
1206 f.x = f
1207 wr = weakref.ref(f)
1208 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001209 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001210 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001211 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212 self.assertEqual(f.read(), b"123xxx")
1213
1214
1215class PyBufferedWriterTest(BufferedWriterTest):
1216 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001217
Guido van Rossum01a27522007-03-07 01:00:12 +00001218class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001219
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001220 def test_constructor(self):
1221 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001222 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001223
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001224 def test_detach(self):
1225 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1226 self.assertRaises(self.UnsupportedOperation, pair.detach)
1227
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001228 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001229 with support.check_warnings(("max_buffer_size is deprecated",
1230 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001231 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001232
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001233 def test_constructor_with_not_readable(self):
1234 class NotReadable(MockRawIO):
1235 def readable(self):
1236 return False
1237
1238 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1239
1240 def test_constructor_with_not_writeable(self):
1241 class NotWriteable(MockRawIO):
1242 def writable(self):
1243 return False
1244
1245 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1246
1247 def test_read(self):
1248 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1249
1250 self.assertEqual(pair.read(3), b"abc")
1251 self.assertEqual(pair.read(1), b"d")
1252 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001253 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1254 self.assertEqual(pair.read(None), b"abc")
1255
1256 def test_readlines(self):
1257 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1258 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1259 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1260 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001261
1262 def test_read1(self):
1263 # .read1() is delegated to the underlying reader object, so this test
1264 # can be shallow.
1265 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1266
1267 self.assertEqual(pair.read1(3), b"abc")
1268
1269 def test_readinto(self):
1270 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1271
1272 data = bytearray(5)
1273 self.assertEqual(pair.readinto(data), 5)
1274 self.assertEqual(data, b"abcde")
1275
1276 def test_write(self):
1277 w = self.MockRawIO()
1278 pair = self.tp(self.MockRawIO(), w)
1279
1280 pair.write(b"abc")
1281 pair.flush()
1282 pair.write(b"def")
1283 pair.flush()
1284 self.assertEqual(w._write_stack, [b"abc", b"def"])
1285
1286 def test_peek(self):
1287 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1288
1289 self.assertTrue(pair.peek(3).startswith(b"abc"))
1290 self.assertEqual(pair.read(3), b"abc")
1291
1292 def test_readable(self):
1293 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1294 self.assertTrue(pair.readable())
1295
1296 def test_writeable(self):
1297 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1298 self.assertTrue(pair.writable())
1299
1300 def test_seekable(self):
1301 # BufferedRWPairs are never seekable, even if their readers and writers
1302 # are.
1303 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1304 self.assertFalse(pair.seekable())
1305
1306 # .flush() is delegated to the underlying writer object and has been
1307 # tested in the test_write method.
1308
1309 def test_close_and_closed(self):
1310 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1311 self.assertFalse(pair.closed)
1312 pair.close()
1313 self.assertTrue(pair.closed)
1314
1315 def test_isatty(self):
1316 class SelectableIsAtty(MockRawIO):
1317 def __init__(self, isatty):
1318 MockRawIO.__init__(self)
1319 self._isatty = isatty
1320
1321 def isatty(self):
1322 return self._isatty
1323
1324 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1325 self.assertFalse(pair.isatty())
1326
1327 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1328 self.assertTrue(pair.isatty())
1329
1330 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1331 self.assertTrue(pair.isatty())
1332
1333 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1334 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336class CBufferedRWPairTest(BufferedRWPairTest):
1337 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339class PyBufferedRWPairTest(BufferedRWPairTest):
1340 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001341
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342
1343class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1344 read_mode = "rb+"
1345 write_mode = "wb+"
1346
1347 def test_constructor(self):
1348 BufferedReaderTest.test_constructor(self)
1349 BufferedWriterTest.test_constructor(self)
1350
1351 def test_read_and_write(self):
1352 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001353 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001354
1355 self.assertEqual(b"as", rw.read(2))
1356 rw.write(b"ddd")
1357 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001358 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001360 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001361
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001362 def test_seek_and_tell(self):
1363 raw = self.BytesIO(b"asdfghjkl")
1364 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001365
Ezio Melottib3aedd42010-11-20 19:04:17 +00001366 self.assertEqual(b"as", rw.read(2))
1367 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001368 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001369 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001370
1371 rw.write(b"asdf")
1372 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001373 self.assertEqual(b"asdfasdfl", rw.read())
1374 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001375 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001376 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001377 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001378 self.assertEqual(7, rw.tell())
1379 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001380 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001381
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001382 def check_flush_and_read(self, read_func):
1383 raw = self.BytesIO(b"abcdefghi")
1384 bufio = self.tp(raw)
1385
Ezio Melottib3aedd42010-11-20 19:04:17 +00001386 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001388 self.assertEqual(b"ef", read_func(bufio, 2))
1389 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001390 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001391 self.assertEqual(6, bufio.tell())
1392 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001393 raw.seek(0, 0)
1394 raw.write(b"XYZ")
1395 # flush() resets the read buffer
1396 bufio.flush()
1397 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399
1400 def test_flush_and_read(self):
1401 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1402
1403 def test_flush_and_readinto(self):
1404 def _readinto(bufio, n=-1):
1405 b = bytearray(n if n >= 0 else 9999)
1406 n = bufio.readinto(b)
1407 return bytes(b[:n])
1408 self.check_flush_and_read(_readinto)
1409
1410 def test_flush_and_peek(self):
1411 def _peek(bufio, n=-1):
1412 # This relies on the fact that the buffer can contain the whole
1413 # raw stream, otherwise peek() can return less.
1414 b = bufio.peek(n)
1415 if n != -1:
1416 b = b[:n]
1417 bufio.seek(len(b), 1)
1418 return b
1419 self.check_flush_and_read(_peek)
1420
1421 def test_flush_and_write(self):
1422 raw = self.BytesIO(b"abcdefghi")
1423 bufio = self.tp(raw)
1424
1425 bufio.write(b"123")
1426 bufio.flush()
1427 bufio.write(b"45")
1428 bufio.flush()
1429 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(b"12345fghi", raw.getvalue())
1431 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432
1433 def test_threads(self):
1434 BufferedReaderTest.test_threads(self)
1435 BufferedWriterTest.test_threads(self)
1436
1437 def test_writes_and_peek(self):
1438 def _peek(bufio):
1439 bufio.peek(1)
1440 self.check_writes(_peek)
1441 def _peek(bufio):
1442 pos = bufio.tell()
1443 bufio.seek(-1, 1)
1444 bufio.peek(1)
1445 bufio.seek(pos, 0)
1446 self.check_writes(_peek)
1447
1448 def test_writes_and_reads(self):
1449 def _read(bufio):
1450 bufio.seek(-1, 1)
1451 bufio.read(1)
1452 self.check_writes(_read)
1453
1454 def test_writes_and_read1s(self):
1455 def _read1(bufio):
1456 bufio.seek(-1, 1)
1457 bufio.read1(1)
1458 self.check_writes(_read1)
1459
1460 def test_writes_and_readintos(self):
1461 def _read(bufio):
1462 bufio.seek(-1, 1)
1463 bufio.readinto(bytearray(1))
1464 self.check_writes(_read)
1465
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001466 def test_write_after_readahead(self):
1467 # Issue #6629: writing after the buffer was filled by readahead should
1468 # first rewind the raw stream.
1469 for overwrite_size in [1, 5]:
1470 raw = self.BytesIO(b"A" * 10)
1471 bufio = self.tp(raw, 4)
1472 # Trigger readahead
1473 self.assertEqual(bufio.read(1), b"A")
1474 self.assertEqual(bufio.tell(), 1)
1475 # Overwriting should rewind the raw stream if it needs so
1476 bufio.write(b"B" * overwrite_size)
1477 self.assertEqual(bufio.tell(), overwrite_size + 1)
1478 # If the write size was smaller than the buffer size, flush() and
1479 # check that rewind happens.
1480 bufio.flush()
1481 self.assertEqual(bufio.tell(), overwrite_size + 1)
1482 s = raw.getvalue()
1483 self.assertEqual(s,
1484 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1485
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001486 def test_truncate_after_read_or_write(self):
1487 raw = self.BytesIO(b"A" * 10)
1488 bufio = self.tp(raw, 100)
1489 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1490 self.assertEqual(bufio.truncate(), 2)
1491 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1492 self.assertEqual(bufio.truncate(), 4)
1493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494 def test_misbehaved_io(self):
1495 BufferedReaderTest.test_misbehaved_io(self)
1496 BufferedWriterTest.test_misbehaved_io(self)
1497
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001498 # You can't construct a BufferedRandom over a non-seekable stream.
1499 test_unseekable = None
1500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001501class CBufferedRandomTest(BufferedRandomTest):
1502 tp = io.BufferedRandom
1503
1504 def test_constructor(self):
1505 BufferedRandomTest.test_constructor(self)
1506 # The allocation can succeed on 32-bit builds, e.g. with more
1507 # than 2GB RAM and a 64-bit kernel.
1508 if sys.maxsize > 0x7FFFFFFF:
1509 rawio = self.MockRawIO()
1510 bufio = self.tp(rawio)
1511 self.assertRaises((OverflowError, MemoryError, ValueError),
1512 bufio.__init__, rawio, sys.maxsize)
1513
1514 def test_garbage_collection(self):
1515 CBufferedReaderTest.test_garbage_collection(self)
1516 CBufferedWriterTest.test_garbage_collection(self)
1517
1518class PyBufferedRandomTest(BufferedRandomTest):
1519 tp = pyio.BufferedRandom
1520
1521
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001522# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1523# properties:
1524# - A single output character can correspond to many bytes of input.
1525# - The number of input bytes to complete the character can be
1526# undetermined until the last input byte is received.
1527# - The number of input bytes can vary depending on previous input.
1528# - A single input byte can correspond to many characters of output.
1529# - The number of output characters can be undetermined until the
1530# last input byte is received.
1531# - The number of output characters can vary depending on previous input.
1532
1533class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1534 """
1535 For testing seek/tell behavior with a stateful, buffering decoder.
1536
1537 Input is a sequence of words. Words may be fixed-length (length set
1538 by input) or variable-length (period-terminated). In variable-length
1539 mode, extra periods are ignored. Possible words are:
1540 - 'i' followed by a number sets the input length, I (maximum 99).
1541 When I is set to 0, words are space-terminated.
1542 - 'o' followed by a number sets the output length, O (maximum 99).
1543 - Any other word is converted into a word followed by a period on
1544 the output. The output word consists of the input word truncated
1545 or padded out with hyphens to make its length equal to O. If O
1546 is 0, the word is output verbatim without truncating or padding.
1547 I and O are initially set to 1. When I changes, any buffered input is
1548 re-scanned according to the new I. EOF also terminates the last word.
1549 """
1550
1551 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001552 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001553 self.reset()
1554
1555 def __repr__(self):
1556 return '<SID %x>' % id(self)
1557
1558 def reset(self):
1559 self.i = 1
1560 self.o = 1
1561 self.buffer = bytearray()
1562
1563 def getstate(self):
1564 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1565 return bytes(self.buffer), i*100 + o
1566
1567 def setstate(self, state):
1568 buffer, io = state
1569 self.buffer = bytearray(buffer)
1570 i, o = divmod(io, 100)
1571 self.i, self.o = i ^ 1, o ^ 1
1572
1573 def decode(self, input, final=False):
1574 output = ''
1575 for b in input:
1576 if self.i == 0: # variable-length, terminated with period
1577 if b == ord('.'):
1578 if self.buffer:
1579 output += self.process_word()
1580 else:
1581 self.buffer.append(b)
1582 else: # fixed-length, terminate after self.i bytes
1583 self.buffer.append(b)
1584 if len(self.buffer) == self.i:
1585 output += self.process_word()
1586 if final and self.buffer: # EOF terminates the last word
1587 output += self.process_word()
1588 return output
1589
1590 def process_word(self):
1591 output = ''
1592 if self.buffer[0] == ord('i'):
1593 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1594 elif self.buffer[0] == ord('o'):
1595 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1596 else:
1597 output = self.buffer.decode('ascii')
1598 if len(output) < self.o:
1599 output += '-'*self.o # pad out with hyphens
1600 if self.o:
1601 output = output[:self.o] # truncate to output length
1602 output += '.'
1603 self.buffer = bytearray()
1604 return output
1605
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001606 codecEnabled = False
1607
1608 @classmethod
1609 def lookupTestDecoder(cls, name):
1610 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001611 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001612 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001613 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001614 incrementalencoder=None,
1615 streamreader=None, streamwriter=None,
1616 incrementaldecoder=cls)
1617
1618# Register the previous decoder for testing.
1619# Disabled by default, tests will enable it.
1620codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1621
1622
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001623class StatefulIncrementalDecoderTest(unittest.TestCase):
1624 """
1625 Make sure the StatefulIncrementalDecoder actually works.
1626 """
1627
1628 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001629 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001630 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001631 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001632 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001633 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001634 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001635 # I=0, O=6 (variable-length input, fixed-length output)
1636 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1637 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001638 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001639 # I=6, O=3 (fixed-length input > fixed-length output)
1640 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1641 # I=0, then 3; O=29, then 15 (with longer output)
1642 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1643 'a----------------------------.' +
1644 'b----------------------------.' +
1645 'cde--------------------------.' +
1646 'abcdefghijabcde.' +
1647 'a.b------------.' +
1648 '.c.------------.' +
1649 'd.e------------.' +
1650 'k--------------.' +
1651 'l--------------.' +
1652 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001653 ]
1654
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001656 # Try a few one-shot test cases.
1657 for input, eof, output in self.test_cases:
1658 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001659 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001660
1661 # Also test an unfinished decode, followed by forcing EOF.
1662 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001663 self.assertEqual(d.decode(b'oiabcd'), '')
1664 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001665
1666class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001667
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001668 def setUp(self):
1669 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1670 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001671 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001672
Guido van Rossumd0712812007-04-11 16:32:43 +00001673 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001674 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001676 def test_constructor(self):
1677 r = self.BytesIO(b"\xc3\xa9\n\n")
1678 b = self.BufferedReader(r, 1000)
1679 t = self.TextIOWrapper(b)
1680 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001681 self.assertEqual(t.encoding, "latin1")
1682 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001684 self.assertEqual(t.encoding, "utf8")
1685 self.assertEqual(t.line_buffering, True)
1686 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001687 self.assertRaises(TypeError, t.__init__, b, newline=42)
1688 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1689
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001690 def test_detach(self):
1691 r = self.BytesIO()
1692 b = self.BufferedWriter(r)
1693 t = self.TextIOWrapper(b)
1694 self.assertIs(t.detach(), b)
1695
1696 t = self.TextIOWrapper(b, encoding="ascii")
1697 t.write("howdy")
1698 self.assertFalse(r.getvalue())
1699 t.detach()
1700 self.assertEqual(r.getvalue(), b"howdy")
1701 self.assertRaises(ValueError, t.detach)
1702
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001703 def test_repr(self):
1704 raw = self.BytesIO("hello".encode("utf-8"))
1705 b = self.BufferedReader(raw)
1706 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001707 modname = self.TextIOWrapper.__module__
1708 self.assertEqual(repr(t),
1709 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1710 raw.name = "dummy"
1711 self.assertEqual(repr(t),
1712 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1713 raw.name = b"dummy"
1714 self.assertEqual(repr(t),
1715 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_line_buffering(self):
1718 r = self.BytesIO()
1719 b = self.BufferedWriter(r, 1000)
1720 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001721 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001722 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001723 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001724 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001725 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001726 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001727
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 def test_encoding(self):
1729 # Check the encoding attribute is always set, and valid
1730 b = self.BytesIO()
1731 t = self.TextIOWrapper(b, encoding="utf8")
1732 self.assertEqual(t.encoding, "utf8")
1733 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001734 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001735 codecs.lookup(t.encoding)
1736
1737 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001738 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 b = self.BytesIO(b"abc\n\xff\n")
1740 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001741 self.assertRaises(UnicodeError, t.read)
1742 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 b = self.BytesIO(b"abc\n\xff\n")
1744 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001745 self.assertRaises(UnicodeError, t.read)
1746 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 b = self.BytesIO(b"abc\n\xff\n")
1748 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001749 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001750 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 b = self.BytesIO(b"abc\n\xff\n")
1752 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001753 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001756 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001757 b = self.BytesIO()
1758 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001759 self.assertRaises(UnicodeError, t.write, "\xff")
1760 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 b = self.BytesIO()
1762 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001763 self.assertRaises(UnicodeError, t.write, "\xff")
1764 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 b = self.BytesIO()
1766 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001767 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001768 t.write("abc\xffdef\n")
1769 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001770 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001771 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 b = self.BytesIO()
1773 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001774 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001775 t.write("abc\xffdef\n")
1776 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001778
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001780 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1781
1782 tests = [
1783 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001784 [ '', input_lines ],
1785 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1786 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1787 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001788 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001789 encodings = (
1790 'utf-8', 'latin-1',
1791 'utf-16', 'utf-16-le', 'utf-16-be',
1792 'utf-32', 'utf-32-le', 'utf-32-be',
1793 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001794
Guido van Rossum8358db22007-08-18 21:39:55 +00001795 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001796 # character in TextIOWrapper._pending_line.
1797 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001798 # XXX: str.encode() should return bytes
1799 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001800 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001801 for bufsize in range(1, 10):
1802 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1804 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001805 encoding=encoding)
1806 if do_reads:
1807 got_lines = []
1808 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001809 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001810 if c2 == '':
1811 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001813 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001814 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001815 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001816
1817 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001818 self.assertEqual(got_line, exp_line)
1819 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 def test_newlines_input(self):
1822 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001823 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1824 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001825 (None, normalized.decode("ascii").splitlines(True)),
1826 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1828 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1829 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001830 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 buf = self.BytesIO(testdata)
1832 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001834 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 def test_newlines_output(self):
1838 testdict = {
1839 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1840 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1841 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1842 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1843 }
1844 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1845 for newline, expected in tests:
1846 buf = self.BytesIO()
1847 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1848 txt.write("AAA\nB")
1849 txt.write("BB\nCCC\n")
1850 txt.write("X\rY\r\nZ")
1851 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001852 self.assertEqual(buf.closed, False)
1853 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854
1855 def test_destructor(self):
1856 l = []
1857 base = self.BytesIO
1858 class MyBytesIO(base):
1859 def close(self):
1860 l.append(self.getvalue())
1861 base.close(self)
1862 b = MyBytesIO()
1863 t = self.TextIOWrapper(b, encoding="ascii")
1864 t.write("abc")
1865 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001866 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868
1869 def test_override_destructor(self):
1870 record = []
1871 class MyTextIO(self.TextIOWrapper):
1872 def __del__(self):
1873 record.append(1)
1874 try:
1875 f = super().__del__
1876 except AttributeError:
1877 pass
1878 else:
1879 f()
1880 def close(self):
1881 record.append(2)
1882 super().close()
1883 def flush(self):
1884 record.append(3)
1885 super().flush()
1886 b = self.BytesIO()
1887 t = MyTextIO(b, encoding="ascii")
1888 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001889 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 self.assertEqual(record, [1, 2, 3])
1891
1892 def test_error_through_destructor(self):
1893 # Test that the exception state is not modified by a destructor,
1894 # even if close() fails.
1895 rawio = self.CloseFailureIO()
1896 def f():
1897 self.TextIOWrapper(rawio).xyzzy
1898 with support.captured_output("stderr") as s:
1899 self.assertRaises(AttributeError, f)
1900 s = s.getvalue().strip()
1901 if s:
1902 # The destructor *may* have printed an unraisable error, check it
1903 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001904 self.assertTrue(s.startswith("Exception IOError: "), s)
1905 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001906
Guido van Rossum9b76da62007-04-11 01:09:03 +00001907 # Systematic tests of the text I/O API
1908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1911 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001913 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001914 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001915 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001917 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001918 self.assertEqual(f.tell(), 0)
1919 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001920 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001921 self.assertEqual(f.seek(0), 0)
1922 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001923 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001924 self.assertEqual(f.read(2), "ab")
1925 self.assertEqual(f.read(1), "c")
1926 self.assertEqual(f.read(1), "")
1927 self.assertEqual(f.read(), "")
1928 self.assertEqual(f.tell(), cookie)
1929 self.assertEqual(f.seek(0), 0)
1930 self.assertEqual(f.seek(0, 2), cookie)
1931 self.assertEqual(f.write("def"), 3)
1932 self.assertEqual(f.seek(cookie), cookie)
1933 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001934 if enc.startswith("utf"):
1935 self.multi_line_test(f, enc)
1936 f.close()
1937
1938 def multi_line_test(self, f, enc):
1939 f.seek(0)
1940 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001941 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001942 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001943 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001944 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001945 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001947 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001948 wlines.append((f.tell(), line))
1949 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001950 f.seek(0)
1951 rlines = []
1952 while True:
1953 pos = f.tell()
1954 line = f.readline()
1955 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001956 break
1957 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001958 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 def test_telling(self):
1961 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001962 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001963 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001964 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001965 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001966 p2 = f.tell()
1967 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001968 self.assertEqual(f.tell(), p0)
1969 self.assertEqual(f.readline(), "\xff\n")
1970 self.assertEqual(f.tell(), p1)
1971 self.assertEqual(f.readline(), "\xff\n")
1972 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001973 f.seek(0)
1974 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001975 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001976 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001977 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001978 f.close()
1979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 def test_seeking(self):
1981 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001982 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001983 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001984 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001985 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001986 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001987 suffix = bytes(u_suffix.encode("utf-8"))
1988 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00001989 with self.open(support.TESTFN, "wb") as f:
1990 f.write(line*2)
1991 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
1992 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(s, str(prefix, "ascii"))
1994 self.assertEqual(f.tell(), prefix_size)
1995 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00001996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001998 # Regression test for a specific bug
1999 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002000 with self.open(support.TESTFN, "wb") as f:
2001 f.write(data)
2002 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2003 f._CHUNK_SIZE # Just test that it exists
2004 f._CHUNK_SIZE = 2
2005 f.readline()
2006 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_seek_and_tell(self):
2009 #Test seek/tell using the StatefulIncrementalDecoder.
2010 # Make test faster by doing smaller seeks
2011 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002013 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 """Tell/seek to various points within a data stream and ensure
2015 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002017 f.write(data)
2018 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 f = self.open(support.TESTFN, encoding='test_decoder')
2020 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002021 decoded = f.read()
2022 f.close()
2023
Neal Norwitze2b07052008-03-18 19:52:05 +00002024 for i in range(min_pos, len(decoded) + 1): # seek positions
2025 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002027 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002028 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002029 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002030 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032 f.close()
2033
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002034 # Enable the test decoder.
2035 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002036
2037 # Run the tests.
2038 try:
2039 # Try each test case.
2040 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002041 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002042
2043 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002044 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2045 offset = CHUNK_SIZE - len(input)//2
2046 prefix = b'.'*offset
2047 # Don't bother seeking into the prefix (takes too long).
2048 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002049 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002050
2051 # Ensure our test decoder won't interfere with subsequent tests.
2052 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002053 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002056 data = "1234567890"
2057 tests = ("utf-16",
2058 "utf-16-le",
2059 "utf-16-be",
2060 "utf-32",
2061 "utf-32-le",
2062 "utf-32-be")
2063 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 buf = self.BytesIO()
2065 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002066 # Check if the BOM is written only once (see issue1753).
2067 f.write(data)
2068 f.write(data)
2069 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002071 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(f.read(), data * 2)
2073 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002074
Benjamin Petersona1b49012009-03-31 23:11:32 +00002075 def test_unreadable(self):
2076 class UnReadable(self.BytesIO):
2077 def readable(self):
2078 return False
2079 txt = self.TextIOWrapper(UnReadable())
2080 self.assertRaises(IOError, txt.read)
2081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002082 def test_read_one_by_one(self):
2083 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002084 reads = ""
2085 while True:
2086 c = txt.read(1)
2087 if not c:
2088 break
2089 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002091
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002092 def test_readlines(self):
2093 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2094 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2095 txt.seek(0)
2096 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2097 txt.seek(0)
2098 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2099
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002100 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002102 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002104 reads = ""
2105 while True:
2106 c = txt.read(128)
2107 if not c:
2108 break
2109 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002111
2112 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002113 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002114
2115 # read one char at a time
2116 reads = ""
2117 while True:
2118 c = txt.read(1)
2119 if not c:
2120 break
2121 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002122 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002123
2124 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002126 txt._CHUNK_SIZE = 4
2127
2128 reads = ""
2129 while True:
2130 c = txt.read(4)
2131 if not c:
2132 break
2133 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002135
2136 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002138 txt._CHUNK_SIZE = 4
2139
2140 reads = txt.read(4)
2141 reads += txt.read(4)
2142 reads += txt.readline()
2143 reads += txt.readline()
2144 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002146
2147 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002149 txt._CHUNK_SIZE = 4
2150
2151 reads = txt.read(4)
2152 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002153 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002154
2155 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002157 txt._CHUNK_SIZE = 4
2158
2159 reads = txt.read(4)
2160 pos = txt.tell()
2161 txt.seek(0)
2162 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002164
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002165 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 buffer = self.BytesIO(self.testdata)
2167 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002168
2169 self.assertEqual(buffer.seekable(), txt.seekable())
2170
Antoine Pitroue4501852009-05-14 18:55:55 +00002171 def test_append_bom(self):
2172 # The BOM is not written again when appending to a non-empty file
2173 filename = support.TESTFN
2174 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2175 with self.open(filename, 'w', encoding=charset) as f:
2176 f.write('aaa')
2177 pos = f.tell()
2178 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002180
2181 with self.open(filename, 'a', encoding=charset) as f:
2182 f.write('xxx')
2183 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002185
2186 def test_seek_bom(self):
2187 # Same test, but when seeking manually
2188 filename = support.TESTFN
2189 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2190 with self.open(filename, 'w', encoding=charset) as f:
2191 f.write('aaa')
2192 pos = f.tell()
2193 with self.open(filename, 'r+', encoding=charset) as f:
2194 f.seek(pos)
2195 f.write('zzz')
2196 f.seek(0)
2197 f.write('bbb')
2198 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002199 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002200
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002201 def test_errors_property(self):
2202 with self.open(support.TESTFN, "w") as f:
2203 self.assertEqual(f.errors, "strict")
2204 with self.open(support.TESTFN, "w", errors="replace") as f:
2205 self.assertEqual(f.errors, "replace")
2206
Victor Stinner45df8202010-04-28 22:31:17 +00002207 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002208 def test_threads_write(self):
2209 # Issue6750: concurrent writes could duplicate data
2210 event = threading.Event()
2211 with self.open(support.TESTFN, "w", buffering=1) as f:
2212 def run(n):
2213 text = "Thread%03d\n" % n
2214 event.wait()
2215 f.write(text)
2216 threads = [threading.Thread(target=lambda n=x: run(n))
2217 for x in range(20)]
2218 for t in threads:
2219 t.start()
2220 time.sleep(0.02)
2221 event.set()
2222 for t in threads:
2223 t.join()
2224 with self.open(support.TESTFN) as f:
2225 content = f.read()
2226 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002227 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002228
Antoine Pitrou6be88762010-05-03 16:48:20 +00002229 def test_flush_error_on_close(self):
2230 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2231 def bad_flush():
2232 raise IOError()
2233 txt.flush = bad_flush
2234 self.assertRaises(IOError, txt.close) # exception not swallowed
2235
2236 def test_multi_close(self):
2237 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2238 txt.close()
2239 txt.close()
2240 txt.close()
2241 self.assertRaises(ValueError, txt.flush)
2242
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002243 def test_unseekable(self):
2244 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2245 self.assertRaises(self.UnsupportedOperation, txt.tell)
2246 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002248class CTextIOWrapperTest(TextIOWrapperTest):
2249
2250 def test_initialization(self):
2251 r = self.BytesIO(b"\xc3\xa9\n\n")
2252 b = self.BufferedReader(r, 1000)
2253 t = self.TextIOWrapper(b)
2254 self.assertRaises(TypeError, t.__init__, b, newline=42)
2255 self.assertRaises(ValueError, t.read)
2256 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2257 self.assertRaises(ValueError, t.read)
2258
2259 def test_garbage_collection(self):
2260 # C TextIOWrapper objects are collected, and collecting them flushes
2261 # all data to disk.
2262 # The Python version has __del__, so it ends in gc.garbage instead.
2263 rawio = io.FileIO(support.TESTFN, "wb")
2264 b = self.BufferedWriter(rawio)
2265 t = self.TextIOWrapper(b, encoding="ascii")
2266 t.write("456def")
2267 t.x = t
2268 wr = weakref.ref(t)
2269 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002270 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002271 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002272 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 self.assertEqual(f.read(), b"456def")
2274
2275class PyTextIOWrapperTest(TextIOWrapperTest):
2276 pass
2277
2278
2279class IncrementalNewlineDecoderTest(unittest.TestCase):
2280
2281 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002282 # UTF-8 specific tests for a newline decoder
2283 def _check_decode(b, s, **kwargs):
2284 # We exercise getstate() / setstate() as well as decode()
2285 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002287 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002288 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
Antoine Pitrou180a3362008-12-14 16:36:46 +00002290 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002291
Antoine Pitrou180a3362008-12-14 16:36:46 +00002292 _check_decode(b'\xe8', "")
2293 _check_decode(b'\xa2', "")
2294 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002295
Antoine Pitrou180a3362008-12-14 16:36:46 +00002296 _check_decode(b'\xe8', "")
2297 _check_decode(b'\xa2', "")
2298 _check_decode(b'\x88', "\u8888")
2299
2300 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002301 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2302
Antoine Pitrou180a3362008-12-14 16:36:46 +00002303 decoder.reset()
2304 _check_decode(b'\n', "\n")
2305 _check_decode(b'\r', "")
2306 _check_decode(b'', "\n", final=True)
2307 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002308
Antoine Pitrou180a3362008-12-14 16:36:46 +00002309 _check_decode(b'\r', "")
2310 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002311
Antoine Pitrou180a3362008-12-14 16:36:46 +00002312 _check_decode(b'\r\r\n', "\n\n")
2313 _check_decode(b'\r', "")
2314 _check_decode(b'\r', "\n")
2315 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002316
Antoine Pitrou180a3362008-12-14 16:36:46 +00002317 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2318 _check_decode(b'\xe8\xa2\x88', "\u8888")
2319 _check_decode(b'\n', "\n")
2320 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2321 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 if encoding is not None:
2326 encoder = codecs.getincrementalencoder(encoding)()
2327 def _decode_bytewise(s):
2328 # Decode one byte at a time
2329 for b in encoder.encode(s):
2330 result.append(decoder.decode(bytes([b])))
2331 else:
2332 encoder = None
2333 def _decode_bytewise(s):
2334 # Decode one char at a time
2335 for c in s:
2336 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002338 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002339 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002340 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002341 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002342 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002344 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002345 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002346 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002347 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002348 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002349 input = "abc"
2350 if encoder is not None:
2351 encoder.reset()
2352 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002353 self.assertEqual(decoder.decode(input), "abc")
2354 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002355
2356 def test_newline_decoder(self):
2357 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 # None meaning the IncrementalNewlineDecoder takes unicode input
2359 # rather than bytes input
2360 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002361 'utf-16', 'utf-16-le', 'utf-16-be',
2362 'utf-32', 'utf-32-le', 'utf-32-be',
2363 )
2364 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 decoder = enc and codecs.getincrementaldecoder(enc)()
2366 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2367 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002368 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2370 self.check_newline_decoding_utf8(decoder)
2371
Antoine Pitrou66913e22009-03-06 23:40:56 +00002372 def test_newline_bytes(self):
2373 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2374 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002375 self.assertEqual(dec.newlines, None)
2376 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2377 self.assertEqual(dec.newlines, None)
2378 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2379 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002380 dec = self.IncrementalNewlineDecoder(None, translate=False)
2381 _check(dec)
2382 dec = self.IncrementalNewlineDecoder(None, translate=True)
2383 _check(dec)
2384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002385class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2386 pass
2387
2388class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2389 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002390
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002391
Guido van Rossum01a27522007-03-07 01:00:12 +00002392# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002393
Guido van Rossum5abbf752007-08-27 17:39:33 +00002394class MiscIOTest(unittest.TestCase):
2395
Barry Warsaw40e82462008-11-20 20:14:50 +00002396 def tearDown(self):
2397 support.unlink(support.TESTFN)
2398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002399 def test___all__(self):
2400 for name in self.io.__all__:
2401 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002402 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002403 if name == "open":
2404 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002405 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002406 self.assertTrue(issubclass(obj, Exception), name)
2407 elif not name.startswith("SEEK_"):
2408 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002409
Barry Warsaw40e82462008-11-20 20:14:50 +00002410 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002412 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002413 f.close()
2414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(f.name, support.TESTFN)
2417 self.assertEqual(f.buffer.name, support.TESTFN)
2418 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2419 self.assertEqual(f.mode, "U")
2420 self.assertEqual(f.buffer.mode, "rb")
2421 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002422 f.close()
2423
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002425 self.assertEqual(f.mode, "w+")
2426 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2427 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002429 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002430 self.assertEqual(g.mode, "wb")
2431 self.assertEqual(g.raw.mode, "wb")
2432 self.assertEqual(g.name, f.fileno())
2433 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002434 f.close()
2435 g.close()
2436
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002437 def test_io_after_close(self):
2438 for kwargs in [
2439 {"mode": "w"},
2440 {"mode": "wb"},
2441 {"mode": "w", "buffering": 1},
2442 {"mode": "w", "buffering": 2},
2443 {"mode": "wb", "buffering": 0},
2444 {"mode": "r"},
2445 {"mode": "rb"},
2446 {"mode": "r", "buffering": 1},
2447 {"mode": "r", "buffering": 2},
2448 {"mode": "rb", "buffering": 0},
2449 {"mode": "w+"},
2450 {"mode": "w+b"},
2451 {"mode": "w+", "buffering": 1},
2452 {"mode": "w+", "buffering": 2},
2453 {"mode": "w+b", "buffering": 0},
2454 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002455 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002456 f.close()
2457 self.assertRaises(ValueError, f.flush)
2458 self.assertRaises(ValueError, f.fileno)
2459 self.assertRaises(ValueError, f.isatty)
2460 self.assertRaises(ValueError, f.__iter__)
2461 if hasattr(f, "peek"):
2462 self.assertRaises(ValueError, f.peek, 1)
2463 self.assertRaises(ValueError, f.read)
2464 if hasattr(f, "read1"):
2465 self.assertRaises(ValueError, f.read1, 1024)
2466 if hasattr(f, "readinto"):
2467 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2468 self.assertRaises(ValueError, f.readline)
2469 self.assertRaises(ValueError, f.readlines)
2470 self.assertRaises(ValueError, f.seek, 0)
2471 self.assertRaises(ValueError, f.tell)
2472 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002473 self.assertRaises(ValueError, f.write,
2474 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002475 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002477
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 def test_blockingioerror(self):
2479 # Various BlockingIOError issues
2480 self.assertRaises(TypeError, self.BlockingIOError)
2481 self.assertRaises(TypeError, self.BlockingIOError, 1)
2482 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2483 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2484 b = self.BlockingIOError(1, "")
2485 self.assertEqual(b.characters_written, 0)
2486 class C(str):
2487 pass
2488 c = C("")
2489 b = self.BlockingIOError(1, c)
2490 c.b = b
2491 b.c = c
2492 wr = weakref.ref(c)
2493 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002494 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002495 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496
2497 def test_abcs(self):
2498 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002499 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2500 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2501 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2502 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002503
2504 def _check_abc_inheritance(self, abcmodule):
2505 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002506 self.assertIsInstance(f, abcmodule.IOBase)
2507 self.assertIsInstance(f, abcmodule.RawIOBase)
2508 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2509 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002511 self.assertIsInstance(f, abcmodule.IOBase)
2512 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2513 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2514 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002516 self.assertIsInstance(f, abcmodule.IOBase)
2517 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2518 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2519 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520
2521 def test_abc_inheritance(self):
2522 # Test implementations inherit from their respective ABCs
2523 self._check_abc_inheritance(self)
2524
2525 def test_abc_inheritance_official(self):
2526 # Test implementations inherit from the official ABCs of the
2527 # baseline "io" module.
2528 self._check_abc_inheritance(io)
2529
Antoine Pitroue033e062010-10-29 10:38:18 +00002530 def _check_warn_on_dealloc(self, *args, **kwargs):
2531 f = open(*args, **kwargs)
2532 r = repr(f)
2533 with self.assertWarns(ResourceWarning) as cm:
2534 f = None
2535 support.gc_collect()
2536 self.assertIn(r, str(cm.warning.args[0]))
2537
2538 def test_warn_on_dealloc(self):
2539 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2540 self._check_warn_on_dealloc(support.TESTFN, "wb")
2541 self._check_warn_on_dealloc(support.TESTFN, "w")
2542
2543 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2544 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002545 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002546 for fd in fds:
2547 try:
2548 os.close(fd)
2549 except EnvironmentError as e:
2550 if e.errno != errno.EBADF:
2551 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002552 self.addCleanup(cleanup_fds)
2553 r, w = os.pipe()
2554 fds += r, w
2555 self._check_warn_on_dealloc(r, *args, **kwargs)
2556 # When using closefd=False, there's no warning
2557 r, w = os.pipe()
2558 fds += r, w
2559 with warnings.catch_warnings(record=True) as recorded:
2560 open(r, *args, closefd=False, **kwargs)
2561 support.gc_collect()
2562 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002563
2564 def test_warn_on_dealloc_fd(self):
2565 self._check_warn_on_dealloc_fd("rb", buffering=0)
2566 self._check_warn_on_dealloc_fd("rb")
2567 self._check_warn_on_dealloc_fd("r")
2568
2569
Antoine Pitrou243757e2010-11-05 21:15:39 +00002570 def test_pickling(self):
2571 # Pickling file objects is forbidden
2572 for kwargs in [
2573 {"mode": "w"},
2574 {"mode": "wb"},
2575 {"mode": "wb", "buffering": 0},
2576 {"mode": "r"},
2577 {"mode": "rb"},
2578 {"mode": "rb", "buffering": 0},
2579 {"mode": "w+"},
2580 {"mode": "w+b"},
2581 {"mode": "w+b", "buffering": 0},
2582 ]:
2583 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2584 with self.open(support.TESTFN, **kwargs) as f:
2585 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002587class CMiscIOTest(MiscIOTest):
2588 io = io
2589
2590class PyMiscIOTest(MiscIOTest):
2591 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002592
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002593
2594@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2595class SignalsTest(unittest.TestCase):
2596
2597 def setUp(self):
2598 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2599
2600 def tearDown(self):
2601 signal.signal(signal.SIGALRM, self.oldalrm)
2602
2603 def alarm_interrupt(self, sig, frame):
2604 1/0
2605
2606 @unittest.skipUnless(threading, 'Threading required for this test.')
2607 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2608 """Check that a partial write, when it gets interrupted, properly
2609 invokes the signal handler."""
2610 read_results = []
2611 def _read():
2612 s = os.read(r, 1)
2613 read_results.append(s)
2614 t = threading.Thread(target=_read)
2615 t.daemon = True
2616 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002617 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002618 try:
2619 wio = self.io.open(w, **fdopen_kwargs)
2620 t.start()
2621 signal.alarm(1)
2622 # Fill the pipe enough that the write will be blocking.
2623 # It will be interrupted by the timer armed above. Since the
2624 # other thread has read one byte, the low-level write will
2625 # return with a successful (partial) result rather than an EINTR.
2626 # The buffered IO layer must check for pending signal
2627 # handlers, which in this case will invoke alarm_interrupt().
2628 self.assertRaises(ZeroDivisionError,
2629 wio.write, item * (1024 * 1024))
2630 t.join()
2631 # We got one byte, get another one and check that it isn't a
2632 # repeat of the first one.
2633 read_results.append(os.read(r, 1))
2634 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2635 finally:
2636 os.close(w)
2637 os.close(r)
2638 # This is deliberate. If we didn't close the file descriptor
2639 # before closing wio, wio would try to flush its internal
2640 # buffer, and block again.
2641 try:
2642 wio.close()
2643 except IOError as e:
2644 if e.errno != errno.EBADF:
2645 raise
2646
2647 def test_interrupted_write_unbuffered(self):
2648 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2649
2650 def test_interrupted_write_buffered(self):
2651 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2652
2653 def test_interrupted_write_text(self):
2654 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2655
2656class CSignalsTest(SignalsTest):
2657 io = io
2658
2659class PySignalsTest(SignalsTest):
2660 io = pyio
2661
2662
Guido van Rossum28524c72007-02-27 05:47:44 +00002663def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664 tests = (CIOTest, PyIOTest,
2665 CBufferedReaderTest, PyBufferedReaderTest,
2666 CBufferedWriterTest, PyBufferedWriterTest,
2667 CBufferedRWPairTest, PyBufferedRWPairTest,
2668 CBufferedRandomTest, PyBufferedRandomTest,
2669 StatefulIncrementalDecoderTest,
2670 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2671 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002672 CMiscIOTest, PyMiscIOTest,
2673 CSignalsTest, PySignalsTest,
2674 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675
2676 # Put the namespaces of the IO module we are testing and some useful mock
2677 # classes in the __dict__ of each test.
2678 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002679 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2681 c_io_ns = {name : getattr(io, name) for name in all_members}
2682 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2683 globs = globals()
2684 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2685 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2686 # Avoid turning open into a bound method.
2687 py_io_ns["open"] = pyio.OpenWrapper
2688 for test in tests:
2689 if test.__name__.startswith("C"):
2690 for name, obj in c_io_ns.items():
2691 setattr(test, name, obj)
2692 elif test.__name__.startswith("Py"):
2693 for name, obj in py_io_ns.items():
2694 setattr(test, name, obj)
2695
2696 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002697
2698if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002699 test_main()