blob: aa1679b973222b3267979854a025dcb73a0c7d4b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000030from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000031from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000033
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000034import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035import io # C implementation of io
36import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000037try:
38 import threading
39except ImportError:
40 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000041
Guido van Rossuma9e20242007-03-08 00:43:48 +000042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043def _default_chunk_size():
44 """Get the default TextIOWrapper chunk size"""
45 with open(__file__, "r", encoding="latin1") as f:
46 return f._CHUNK_SIZE
47
48
49class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000051 def __init__(self, read_stack=()):
52 self._read_stack = list(read_stack)
53 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000055
56 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000059 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000061 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
91 return 0
92 if data is None:
93 del self._read_stack[0]
94 return None
95 n = len(data)
96 if len(data) <= max_len:
97 del self._read_stack[0]
98 buf[:n] = data
99 return n
100 else:
101 buf[:] = data[:max_len]
102 self._read_stack[0] = data[max_len:]
103 return max_len
104
105 def truncate(self, pos=None):
106 return pos
107
108class CMockRawIO(MockRawIO, io.RawIOBase):
109 pass
110
111class PyMockRawIO(MockRawIO, pyio.RawIOBase):
112 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000113
Guido van Rossuma9e20242007-03-08 00:43:48 +0000114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000115class MisbehavedRawIO(MockRawIO):
116 def write(self, b):
117 return super().write(b) * 2
118
119 def read(self, n=None):
120 return super().read(n) * 2
121
122 def seek(self, pos, whence):
123 return -123
124
125 def tell(self):
126 return -456
127
128 def readinto(self, buf):
129 super().readinto(buf)
130 return len(buf) * 5
131
132class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
133 pass
134
135class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
136 pass
137
138
139class CloseFailureIO(MockRawIO):
140 closed = 0
141
142 def close(self):
143 if not self.closed:
144 self.closed = 1
145 raise IOError
146
147class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
148 pass
149
150class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
151 pass
152
153
154class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000155
156 def __init__(self, data):
157 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000158 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000159
160 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000162 self.read_history.append(None if res is None else len(res))
163 return res
164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165 def readinto(self, b):
166 res = super().readinto(b)
167 self.read_history.append(res)
168 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170class CMockFileIO(MockFileIO, io.BytesIO):
171 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173class PyMockFileIO(MockFileIO, pyio.BytesIO):
174 pass
175
176
177class MockNonBlockWriterIO:
178
179 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000180 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def pop_written(self):
184 s = b"".join(self._write_stack)
185 self._write_stack[:] = []
186 return s
187
188 def block_on(self, char):
189 """Block when a given char is encountered."""
190 self._blocker_char = char
191
192 def readable(self):
193 return True
194
195 def seekable(self):
196 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Guido van Rossum01a27522007-03-07 01:00:12 +0000198 def writable(self):
199 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201 def write(self, b):
202 b = bytes(b)
203 n = -1
204 if self._blocker_char:
205 try:
206 n = b.index(self._blocker_char)
207 except ValueError:
208 pass
209 else:
210 self._blocker_char = None
211 self._write_stack.append(b[:n])
212 raise self.BlockingIOError(0, "test blocking", n)
213 self._write_stack.append(b)
214 return len(b)
215
216class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
217 BlockingIOError = io.BlockingIOError
218
219class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
220 BlockingIOError = pyio.BlockingIOError
221
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Guido van Rossum28524c72007-02-27 05:47:44 +0000223class IOTest(unittest.TestCase):
224
Neal Norwitze7789b12008-03-24 06:18:09 +0000225 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000226 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000227
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000230
Guido van Rossum28524c72007-02-27 05:47:44 +0000231 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000232 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000233 f.truncate(0)
234 self.assertEqual(f.tell(), 5)
235 f.seek(0)
236
237 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
239 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000240 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000241 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000242 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000243 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000244 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000245 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000246 self.assertEqual(f.seek(-1, 2), 13)
247 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000248
Guido van Rossum87429772007-04-10 21:06:59 +0000249 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000250 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000251 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000252
Guido van Rossum9b76da62007-04-11 01:09:03 +0000253 def read_ops(self, f, buffered=False):
254 data = f.read(5)
255 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000256 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000257 self.assertEqual(f.readinto(data), 5)
258 self.assertEqual(data, b" worl")
259 self.assertEqual(f.readinto(data), 2)
260 self.assertEqual(len(data), 5)
261 self.assertEqual(data[:2], b"d\n")
262 self.assertEqual(f.seek(0), 0)
263 self.assertEqual(f.read(20), b"hello world\n")
264 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000265 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000266 self.assertEqual(f.seek(-6, 2), 6)
267 self.assertEqual(f.read(5), b"world")
268 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000269 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000270 self.assertEqual(f.seek(-6, 1), 5)
271 self.assertEqual(f.read(5), b" worl")
272 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000273 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000274 if buffered:
275 f.seek(0)
276 self.assertEqual(f.read(), b"hello world\n")
277 f.seek(6)
278 self.assertEqual(f.read(), b"world\n")
279 self.assertEqual(f.read(), b"")
280
Guido van Rossum34d69e52007-04-10 20:08:41 +0000281 LARGE = 2**31
282
Guido van Rossum53807da2007-04-10 19:01:47 +0000283 def large_file_ops(self, f):
284 assert f.readable()
285 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000286 self.assertEqual(f.seek(self.LARGE), self.LARGE)
287 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000288 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.tell(), self.LARGE + 3)
290 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000292 self.assertEqual(f.tell(), self.LARGE + 2)
293 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
297 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 self.assertEqual(f.read(2), b"x")
299
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000300 def test_invalid_operations(self):
301 # Try writing on a file opened in read mode and vice-versa.
302 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000303 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000304 self.assertRaises(IOError, fp.read)
305 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000306 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000307 self.assertRaises(IOError, fp.write, b"blah")
308 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000309 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000310 self.assertRaises(IOError, fp.write, "blah")
311 self.assertRaises(IOError, fp.writelines, ["blah\n"])
312
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000314 with self.open(support.TESTFN, "wb", buffering=0) as f:
315 self.assertEqual(f.readable(), False)
316 self.assertEqual(f.writable(), True)
317 self.assertEqual(f.seekable(), True)
318 self.write_ops(f)
319 with self.open(support.TESTFN, "rb", buffering=0) as f:
320 self.assertEqual(f.readable(), True)
321 self.assertEqual(f.writable(), False)
322 self.assertEqual(f.seekable(), True)
323 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000324
Guido van Rossum87429772007-04-10 21:06:59 +0000325 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000326 with self.open(support.TESTFN, "wb") as f:
327 self.assertEqual(f.readable(), False)
328 self.assertEqual(f.writable(), True)
329 self.assertEqual(f.seekable(), True)
330 self.write_ops(f)
331 with self.open(support.TESTFN, "rb") as f:
332 self.assertEqual(f.readable(), True)
333 self.assertEqual(f.writable(), False)
334 self.assertEqual(f.seekable(), True)
335 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000336
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000337 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000338 with self.open(support.TESTFN, "wb") as f:
339 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
340 with self.open(support.TESTFN, "rb") as f:
341 self.assertEqual(f.readline(), b"abc\n")
342 self.assertEqual(f.readline(10), b"def\n")
343 self.assertEqual(f.readline(2), b"xy")
344 self.assertEqual(f.readline(4), b"zzy\n")
345 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000346 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000347 self.assertRaises(TypeError, f.readline, 5.3)
348 with self.open(support.TESTFN, "r") as f:
349 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000350
Guido van Rossum28524c72007-02-27 05:47:44 +0000351 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000352 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000353 self.write_ops(f)
354 data = f.getvalue()
355 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000356 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000357 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000358
Guido van Rossum53807da2007-04-10 19:01:47 +0000359 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000360 # On Windows and Mac OSX this test comsumes large resources; It takes
361 # a long time to build the >2GB file and takes >2GB of disk space
362 # therefore the resource must be enabled to run this test.
363 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000364 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000365 print("\nTesting large file ops skipped on %s." % sys.platform,
366 file=sys.stderr)
367 print("It requires %d bytes and a long time." % self.LARGE,
368 file=sys.stderr)
369 print("Use 'regrtest.py -u largefile test_io' to run it.",
370 file=sys.stderr)
371 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000372 with self.open(support.TESTFN, "w+b", 0) as f:
373 self.large_file_ops(f)
374 with self.open(support.TESTFN, "w+b") as f:
375 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000376
377 def test_with_open(self):
378 for bufsize in (0, 1, 100):
379 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000381 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000382 self.assertEqual(f.closed, True)
383 f = None
384 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000385 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000386 1/0
387 except ZeroDivisionError:
388 self.assertEqual(f.closed, True)
389 else:
390 self.fail("1/0 didn't raise an exception")
391
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 # issue 5008
393 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000394 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000399 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000401 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000402
Guido van Rossum87429772007-04-10 21:06:59 +0000403 def test_destructor(self):
404 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000406 def __del__(self):
407 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 try:
409 f = super().__del__
410 except AttributeError:
411 pass
412 else:
413 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000414 def close(self):
415 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000417 def flush(self):
418 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000419 super().flush()
420 f = MyFileIO(support.TESTFN, "wb")
421 f.write(b"xxx")
422 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000423 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000424 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000426 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427
428 def _check_base_destructor(self, base):
429 record = []
430 class MyIO(base):
431 def __init__(self):
432 # This exercises the availability of attributes on object
433 # destruction.
434 # (in the C version, close() is called by the tp_dealloc
435 # function, not by __del__)
436 self.on_del = 1
437 self.on_close = 2
438 self.on_flush = 3
439 def __del__(self):
440 record.append(self.on_del)
441 try:
442 f = super().__del__
443 except AttributeError:
444 pass
445 else:
446 f()
447 def close(self):
448 record.append(self.on_close)
449 super().close()
450 def flush(self):
451 record.append(self.on_flush)
452 super().flush()
453 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000454 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000455 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000456 self.assertEqual(record, [1, 2, 3])
457
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 def test_IOBase_destructor(self):
459 self._check_base_destructor(self.IOBase)
460
461 def test_RawIOBase_destructor(self):
462 self._check_base_destructor(self.RawIOBase)
463
464 def test_BufferedIOBase_destructor(self):
465 self._check_base_destructor(self.BufferedIOBase)
466
467 def test_TextIOBase_destructor(self):
468 self._check_base_destructor(self.TextIOBase)
469
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000471 with self.open(support.TESTFN, "wb") as f:
472 f.write(b"xxx")
473 with self.open(support.TESTFN, "rb") as f:
474 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000475
Guido van Rossumd4103952007-04-12 05:44:49 +0000476 def test_array_writes(self):
477 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000478 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000479 with self.open(support.TESTFN, "wb", 0) as f:
480 self.assertEqual(f.write(a), n)
481 with self.open(support.TESTFN, "wb") as f:
482 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000483
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000484 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000486 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 def test_read_closed(self):
489 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000490 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 with self.open(support.TESTFN, "r") as f:
492 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 self.assertEqual(file.read(), "egg\n")
494 file.seek(0)
495 file.close()
496 self.assertRaises(ValueError, file.read)
497
498 def test_no_closefd_with_filename(self):
499 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501
502 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000504 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000506 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.buffer.raw.closefd, False)
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_garbage_collection(self):
511 # FileIO objects are collected, and collecting them flushes
512 # all data to disk.
513 f = self.FileIO(support.TESTFN, "wb")
514 f.write(b"abcxxx")
515 f.f = f
516 wr = weakref.ref(f)
517 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000518 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000519 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000520 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000521 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000522
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 def test_unbounded_file(self):
524 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
525 zero = "/dev/zero"
526 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000527 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000529 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000531 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000532 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000534 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000536 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000537 self.assertRaises(OverflowError, f.read)
538
Antoine Pitrou6be88762010-05-03 16:48:20 +0000539 def test_flush_error_on_close(self):
540 f = self.open(support.TESTFN, "wb", buffering=0)
541 def bad_flush():
542 raise IOError()
543 f.flush = bad_flush
544 self.assertRaises(IOError, f.close) # exception not swallowed
545
546 def test_multi_close(self):
547 f = self.open(support.TESTFN, "wb", buffering=0)
548 f.close()
549 f.close()
550 f.close()
551 self.assertRaises(ValueError, f.flush)
552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553class CIOTest(IOTest):
554 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556class PyIOTest(IOTest):
557 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000558
Guido van Rossuma9e20242007-03-08 00:43:48 +0000559
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560class CommonBufferedTests:
561 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
562
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000563 def test_detach(self):
564 raw = self.MockRawIO()
565 buf = self.tp(raw)
566 self.assertIs(buf.detach(), raw)
567 self.assertRaises(ValueError, buf.detach)
568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 def test_fileno(self):
570 rawio = self.MockRawIO()
571 bufio = self.tp(rawio)
572
573 self.assertEquals(42, bufio.fileno())
574
575 def test_no_fileno(self):
576 # XXX will we always have fileno() function? If so, kill
577 # this test. Else, write it.
578 pass
579
580 def test_invalid_args(self):
581 rawio = self.MockRawIO()
582 bufio = self.tp(rawio)
583 # Invalid whence
584 self.assertRaises(ValueError, bufio.seek, 0, -1)
585 self.assertRaises(ValueError, bufio.seek, 0, 3)
586
587 def test_override_destructor(self):
588 tp = self.tp
589 record = []
590 class MyBufferedIO(tp):
591 def __del__(self):
592 record.append(1)
593 try:
594 f = super().__del__
595 except AttributeError:
596 pass
597 else:
598 f()
599 def close(self):
600 record.append(2)
601 super().close()
602 def flush(self):
603 record.append(3)
604 super().flush()
605 rawio = self.MockRawIO()
606 bufio = MyBufferedIO(rawio)
607 writable = bufio.writable()
608 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000609 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610 if writable:
611 self.assertEqual(record, [1, 2, 3])
612 else:
613 self.assertEqual(record, [1, 2])
614
615 def test_context_manager(self):
616 # Test usability as a context manager
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 def _with():
620 with bufio:
621 pass
622 _with()
623 # bufio should now be closed, and using it a second time should raise
624 # a ValueError.
625 self.assertRaises(ValueError, _with)
626
627 def test_error_through_destructor(self):
628 # Test that the exception state is not modified by a destructor,
629 # even if close() fails.
630 rawio = self.CloseFailureIO()
631 def f():
632 self.tp(rawio).xyzzy
633 with support.captured_output("stderr") as s:
634 self.assertRaises(AttributeError, f)
635 s = s.getvalue().strip()
636 if s:
637 # The destructor *may* have printed an unraisable error, check it
638 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000639 self.assertTrue(s.startswith("Exception IOError: "), s)
640 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000641
Antoine Pitrou716c4442009-05-23 19:04:03 +0000642 def test_repr(self):
643 raw = self.MockRawIO()
644 b = self.tp(raw)
645 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
646 self.assertEqual(repr(b), "<%s>" % clsname)
647 raw.name = "dummy"
648 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
649 raw.name = b"dummy"
650 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
651
Antoine Pitrou6be88762010-05-03 16:48:20 +0000652 def test_flush_error_on_close(self):
653 raw = self.MockRawIO()
654 def bad_flush():
655 raise IOError()
656 raw.flush = bad_flush
657 b = self.tp(raw)
658 self.assertRaises(IOError, b.close) # exception not swallowed
659
660 def test_multi_close(self):
661 raw = self.MockRawIO()
662 b = self.tp(raw)
663 b.close()
664 b.close()
665 b.close()
666 self.assertRaises(ValueError, b.flush)
667
Guido van Rossum78892e42007-04-06 17:31:18 +0000668
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000669class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
670 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000671
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000672 def test_constructor(self):
673 rawio = self.MockRawIO([b"abc"])
674 bufio = self.tp(rawio)
675 bufio.__init__(rawio)
676 bufio.__init__(rawio, buffer_size=1024)
677 bufio.__init__(rawio, buffer_size=16)
678 self.assertEquals(b"abc", bufio.read())
679 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
680 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
681 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
682 rawio = self.MockRawIO([b"abc"])
683 bufio.__init__(rawio)
684 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000687 for arg in (None, 7):
688 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
689 bufio = self.tp(rawio)
690 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691 # Invalid args
692 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_read1(self):
695 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
696 bufio = self.tp(rawio)
697 self.assertEquals(b"a", bufio.read(1))
698 self.assertEquals(b"b", bufio.read1(1))
699 self.assertEquals(rawio._reads, 1)
700 self.assertEquals(b"c", bufio.read1(100))
701 self.assertEquals(rawio._reads, 1)
702 self.assertEquals(b"d", bufio.read1(100))
703 self.assertEquals(rawio._reads, 2)
704 self.assertEquals(b"efg", bufio.read1(100))
705 self.assertEquals(rawio._reads, 3)
706 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000707 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 # Invalid args
709 self.assertRaises(ValueError, bufio.read1, -1)
710
711 def test_readinto(self):
712 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
713 bufio = self.tp(rawio)
714 b = bytearray(2)
715 self.assertEquals(bufio.readinto(b), 2)
716 self.assertEquals(b, b"ab")
717 self.assertEquals(bufio.readinto(b), 2)
718 self.assertEquals(b, b"cd")
719 self.assertEquals(bufio.readinto(b), 2)
720 self.assertEquals(b, b"ef")
721 self.assertEquals(bufio.readinto(b), 1)
722 self.assertEquals(b, b"gf")
723 self.assertEquals(bufio.readinto(b), 0)
724 self.assertEquals(b, b"gf")
725
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000726 def test_readlines(self):
727 def bufio():
728 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
729 return self.tp(rawio)
730 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
731 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
732 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000735 data = b"abcdefghi"
736 dlen = len(data)
737
738 tests = [
739 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
740 [ 100, [ 3, 3, 3], [ dlen ] ],
741 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
742 ]
743
744 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 rawio = self.MockFileIO(data)
746 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000747 pos = 0
748 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000749 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000750 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000752 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000755 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
757 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000758
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000759 self.assertEquals(b"abcd", bufio.read(6))
760 self.assertEquals(b"e", bufio.read(1))
761 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000764 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 def test_read_past_eof(self):
767 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
768 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000769
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000770 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000772 def test_read_all(self):
773 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
774 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000775
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000776 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000777
Victor Stinner45df8202010-04-28 22:31:17 +0000778 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000780 try:
781 # Write out many bytes with exactly the same number of 0's,
782 # 1's... 255's. This will help us check that concurrent reading
783 # doesn't duplicate or forget contents.
784 N = 1000
785 l = list(range(256)) * N
786 random.shuffle(l)
787 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000788 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000789 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000790 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000792 errors = []
793 results = []
794 def f():
795 try:
796 # Intra-buffer read then buffer-flushing read
797 for n in cycle([1, 19]):
798 s = bufio.read(n)
799 if not s:
800 break
801 # list.append() is atomic
802 results.append(s)
803 except Exception as e:
804 errors.append(e)
805 raise
806 threads = [threading.Thread(target=f) for x in range(20)]
807 for t in threads:
808 t.start()
809 time.sleep(0.02) # yield
810 for t in threads:
811 t.join()
812 self.assertFalse(errors,
813 "the following exceptions were caught: %r" % errors)
814 s = b''.join(results)
815 for i in range(256):
816 c = bytes(bytearray([i]))
817 self.assertEqual(s.count(c), N)
818 finally:
819 support.unlink(support.TESTFN)
820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000821 def test_misbehaved_io(self):
822 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
823 bufio = self.tp(rawio)
824 self.assertRaises(IOError, bufio.seek, 0)
825 self.assertRaises(IOError, bufio.tell)
826
827class CBufferedReaderTest(BufferedReaderTest):
828 tp = io.BufferedReader
829
830 def test_constructor(self):
831 BufferedReaderTest.test_constructor(self)
832 # The allocation can succeed on 32-bit builds, e.g. with more
833 # than 2GB RAM and a 64-bit kernel.
834 if sys.maxsize > 0x7FFFFFFF:
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio)
837 self.assertRaises((OverflowError, MemoryError, ValueError),
838 bufio.__init__, rawio, sys.maxsize)
839
840 def test_initialization(self):
841 rawio = self.MockRawIO([b"abc"])
842 bufio = self.tp(rawio)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
844 self.assertRaises(ValueError, bufio.read)
845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
846 self.assertRaises(ValueError, bufio.read)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
848 self.assertRaises(ValueError, bufio.read)
849
850 def test_misbehaved_io_read(self):
851 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
852 bufio = self.tp(rawio)
853 # _pyio.BufferedReader seems to implement reading different, so that
854 # checking this is not so easy.
855 self.assertRaises(IOError, bufio.read, 10)
856
857 def test_garbage_collection(self):
858 # C BufferedReader objects are collected.
859 # The Python version has __del__, so it ends into gc.garbage instead
860 rawio = self.FileIO(support.TESTFN, "w+b")
861 f = self.tp(rawio)
862 f.f = f
863 wr = weakref.ref(f)
864 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000865 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000866 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867
868class PyBufferedReaderTest(BufferedReaderTest):
869 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000870
Guido van Rossuma9e20242007-03-08 00:43:48 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
873 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_constructor(self):
876 rawio = self.MockRawIO()
877 bufio = self.tp(rawio)
878 bufio.__init__(rawio)
879 bufio.__init__(rawio, buffer_size=1024)
880 bufio.__init__(rawio, buffer_size=16)
881 self.assertEquals(3, bufio.write(b"abc"))
882 bufio.flush()
883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
884 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
885 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
886 bufio.__init__(rawio)
887 self.assertEquals(3, bufio.write(b"ghi"))
888 bufio.flush()
889 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
890
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000891 def test_detach_flush(self):
892 raw = self.MockRawIO()
893 buf = self.tp(raw)
894 buf.write(b"howdy!")
895 self.assertFalse(raw._write_stack)
896 buf.detach()
897 self.assertEqual(raw._write_stack, [b"howdy!"])
898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000900 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 writer = self.MockRawIO()
902 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000903 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000904 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_write_overflow(self):
907 writer = self.MockRawIO()
908 bufio = self.tp(writer, 8)
909 contents = b"abcdefghijklmnop"
910 for n in range(0, len(contents), 3):
911 bufio.write(contents[n:n+3])
912 flushed = b"".join(writer._write_stack)
913 # At least (total - 8) bytes were implicitly flushed, perhaps more
914 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000915 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000917 def check_writes(self, intermediate_func):
918 # Lots of writes, test the flushed output is as expected.
919 contents = bytes(range(256)) * 1000
920 n = 0
921 writer = self.MockRawIO()
922 bufio = self.tp(writer, 13)
923 # Generator of write sizes: repeat each N 15 times then proceed to N+1
924 def gen_sizes():
925 for size in count(1):
926 for i in range(15):
927 yield size
928 sizes = gen_sizes()
929 while n < len(contents):
930 size = min(next(sizes), len(contents) - n)
931 self.assertEquals(bufio.write(contents[n:n+size]), size)
932 intermediate_func(bufio)
933 n += size
934 bufio.flush()
935 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 def test_writes(self):
938 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000940 def test_writes_and_flushes(self):
941 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_writes_and_seeks(self):
944 def _seekabs(bufio):
945 pos = bufio.tell()
946 bufio.seek(pos + 1, 0)
947 bufio.seek(pos - 1, 0)
948 bufio.seek(pos, 0)
949 self.check_writes(_seekabs)
950 def _seekrel(bufio):
951 pos = bufio.seek(0, 1)
952 bufio.seek(+1, 1)
953 bufio.seek(-1, 1)
954 bufio.seek(pos, 0)
955 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_writes_and_truncates(self):
958 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 def test_write_non_blocking(self):
961 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000962 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 self.assertEquals(bufio.write(b"abcd"), 4)
965 self.assertEquals(bufio.write(b"efghi"), 5)
966 # 1 byte will be written, the rest will be buffered
967 raw.block_on(b"k")
968 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970 # 8 bytes will be written, 8 will be buffered and the rest will be lost
971 raw.block_on(b"0")
972 try:
973 bufio.write(b"opqrwxyz0123456789")
974 except self.BlockingIOError as e:
975 written = e.characters_written
976 else:
977 self.fail("BlockingIOError should have been raised")
978 self.assertEquals(written, 16)
979 self.assertEquals(raw.pop_written(),
980 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
983 s = raw.pop_written()
984 # Previously buffered bytes were flushed
985 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000986
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987 def test_write_and_rewind(self):
988 raw = io.BytesIO()
989 bufio = self.tp(raw, 4)
990 self.assertEqual(bufio.write(b"abcdef"), 6)
991 self.assertEqual(bufio.tell(), 6)
992 bufio.seek(0, 0)
993 self.assertEqual(bufio.write(b"XY"), 2)
994 bufio.seek(6, 0)
995 self.assertEqual(raw.getvalue(), b"XYcdef")
996 self.assertEqual(bufio.write(b"123456"), 6)
997 bufio.flush()
998 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001000 def test_flush(self):
1001 writer = self.MockRawIO()
1002 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001003 bufio.write(b"abc")
1004 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001005 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 def test_destructor(self):
1008 writer = self.MockRawIO()
1009 bufio = self.tp(writer, 8)
1010 bufio.write(b"abc")
1011 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001012 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 self.assertEquals(b"abc", writer._write_stack[0])
1014
1015 def test_truncate(self):
1016 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001017 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 bufio = self.tp(raw, 8)
1019 bufio.write(b"abcdef")
1020 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001021 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001022 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023 self.assertEqual(f.read(), b"abc")
1024
Victor Stinner45df8202010-04-28 22:31:17 +00001025 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001027 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 # Write out many bytes from many threads and test they were
1029 # all flushed.
1030 N = 1000
1031 contents = bytes(range(256)) * N
1032 sizes = cycle([1, 19])
1033 n = 0
1034 queue = deque()
1035 while n < len(contents):
1036 size = next(sizes)
1037 queue.append(contents[n:n+size])
1038 n += size
1039 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001040 # We use a real file object because it allows us to
1041 # exercise situations where the GIL is released before
1042 # writing the buffer to the raw streams. This is in addition
1043 # to concurrency issues due to switching threads in the middle
1044 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001045 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001047 errors = []
1048 def f():
1049 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 while True:
1051 try:
1052 s = queue.popleft()
1053 except IndexError:
1054 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001055 bufio.write(s)
1056 except Exception as e:
1057 errors.append(e)
1058 raise
1059 threads = [threading.Thread(target=f) for x in range(20)]
1060 for t in threads:
1061 t.start()
1062 time.sleep(0.02) # yield
1063 for t in threads:
1064 t.join()
1065 self.assertFalse(errors,
1066 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001068 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 s = f.read()
1070 for i in range(256):
1071 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001072 finally:
1073 support.unlink(support.TESTFN)
1074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075 def test_misbehaved_io(self):
1076 rawio = self.MisbehavedRawIO()
1077 bufio = self.tp(rawio, 5)
1078 self.assertRaises(IOError, bufio.seek, 0)
1079 self.assertRaises(IOError, bufio.tell)
1080 self.assertRaises(IOError, bufio.write, b"abcdef")
1081
Benjamin Peterson59406a92009-03-26 17:10:29 +00001082 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001083 with support.check_warnings(("max_buffer_size is deprecated",
1084 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001085 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001086
1087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088class CBufferedWriterTest(BufferedWriterTest):
1089 tp = io.BufferedWriter
1090
1091 def test_constructor(self):
1092 BufferedWriterTest.test_constructor(self)
1093 # The allocation can succeed on 32-bit builds, e.g. with more
1094 # than 2GB RAM and a 64-bit kernel.
1095 if sys.maxsize > 0x7FFFFFFF:
1096 rawio = self.MockRawIO()
1097 bufio = self.tp(rawio)
1098 self.assertRaises((OverflowError, MemoryError, ValueError),
1099 bufio.__init__, rawio, sys.maxsize)
1100
1101 def test_initialization(self):
1102 rawio = self.MockRawIO()
1103 bufio = self.tp(rawio)
1104 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1105 self.assertRaises(ValueError, bufio.write, b"def")
1106 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1107 self.assertRaises(ValueError, bufio.write, b"def")
1108 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1109 self.assertRaises(ValueError, bufio.write, b"def")
1110
1111 def test_garbage_collection(self):
1112 # C BufferedWriter objects are collected, and collecting them flushes
1113 # all data to disk.
1114 # The Python version has __del__, so it ends into gc.garbage instead
1115 rawio = self.FileIO(support.TESTFN, "w+b")
1116 f = self.tp(rawio)
1117 f.write(b"123xxx")
1118 f.x = f
1119 wr = weakref.ref(f)
1120 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001121 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001122 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001123 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 self.assertEqual(f.read(), b"123xxx")
1125
1126
1127class PyBufferedWriterTest(BufferedWriterTest):
1128 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001129
Guido van Rossum01a27522007-03-07 01:00:12 +00001130class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001131
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001132 def test_constructor(self):
1133 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001134 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001135
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001136 def test_detach(self):
1137 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1138 self.assertRaises(self.UnsupportedOperation, pair.detach)
1139
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001140 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001141 with support.check_warnings(("max_buffer_size is deprecated",
1142 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001143 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001144
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001145 def test_constructor_with_not_readable(self):
1146 class NotReadable(MockRawIO):
1147 def readable(self):
1148 return False
1149
1150 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1151
1152 def test_constructor_with_not_writeable(self):
1153 class NotWriteable(MockRawIO):
1154 def writable(self):
1155 return False
1156
1157 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1158
1159 def test_read(self):
1160 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1161
1162 self.assertEqual(pair.read(3), b"abc")
1163 self.assertEqual(pair.read(1), b"d")
1164 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001165 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1166 self.assertEqual(pair.read(None), b"abc")
1167
1168 def test_readlines(self):
1169 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1170 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1171 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1172 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001173
1174 def test_read1(self):
1175 # .read1() is delegated to the underlying reader object, so this test
1176 # can be shallow.
1177 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1178
1179 self.assertEqual(pair.read1(3), b"abc")
1180
1181 def test_readinto(self):
1182 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1183
1184 data = bytearray(5)
1185 self.assertEqual(pair.readinto(data), 5)
1186 self.assertEqual(data, b"abcde")
1187
1188 def test_write(self):
1189 w = self.MockRawIO()
1190 pair = self.tp(self.MockRawIO(), w)
1191
1192 pair.write(b"abc")
1193 pair.flush()
1194 pair.write(b"def")
1195 pair.flush()
1196 self.assertEqual(w._write_stack, [b"abc", b"def"])
1197
1198 def test_peek(self):
1199 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1200
1201 self.assertTrue(pair.peek(3).startswith(b"abc"))
1202 self.assertEqual(pair.read(3), b"abc")
1203
1204 def test_readable(self):
1205 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1206 self.assertTrue(pair.readable())
1207
1208 def test_writeable(self):
1209 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1210 self.assertTrue(pair.writable())
1211
1212 def test_seekable(self):
1213 # BufferedRWPairs are never seekable, even if their readers and writers
1214 # are.
1215 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1216 self.assertFalse(pair.seekable())
1217
1218 # .flush() is delegated to the underlying writer object and has been
1219 # tested in the test_write method.
1220
1221 def test_close_and_closed(self):
1222 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1223 self.assertFalse(pair.closed)
1224 pair.close()
1225 self.assertTrue(pair.closed)
1226
1227 def test_isatty(self):
1228 class SelectableIsAtty(MockRawIO):
1229 def __init__(self, isatty):
1230 MockRawIO.__init__(self)
1231 self._isatty = isatty
1232
1233 def isatty(self):
1234 return self._isatty
1235
1236 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1237 self.assertFalse(pair.isatty())
1238
1239 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1240 self.assertTrue(pair.isatty())
1241
1242 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1243 self.assertTrue(pair.isatty())
1244
1245 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1246 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248class CBufferedRWPairTest(BufferedRWPairTest):
1249 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251class PyBufferedRWPairTest(BufferedRWPairTest):
1252 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254
1255class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1256 read_mode = "rb+"
1257 write_mode = "wb+"
1258
1259 def test_constructor(self):
1260 BufferedReaderTest.test_constructor(self)
1261 BufferedWriterTest.test_constructor(self)
1262
1263 def test_read_and_write(self):
1264 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001265 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001266
1267 self.assertEqual(b"as", rw.read(2))
1268 rw.write(b"ddd")
1269 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001270 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001272 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_seek_and_tell(self):
1275 raw = self.BytesIO(b"asdfghjkl")
1276 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001277
1278 self.assertEquals(b"as", rw.read(2))
1279 self.assertEquals(2, rw.tell())
1280 rw.seek(0, 0)
1281 self.assertEquals(b"asdf", rw.read(4))
1282
1283 rw.write(b"asdf")
1284 rw.seek(0, 0)
1285 self.assertEquals(b"asdfasdfl", rw.read())
1286 self.assertEquals(9, rw.tell())
1287 rw.seek(-4, 2)
1288 self.assertEquals(5, rw.tell())
1289 rw.seek(2, 1)
1290 self.assertEquals(7, rw.tell())
1291 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001292 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001293
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001294 def check_flush_and_read(self, read_func):
1295 raw = self.BytesIO(b"abcdefghi")
1296 bufio = self.tp(raw)
1297
1298 self.assertEquals(b"ab", read_func(bufio, 2))
1299 bufio.write(b"12")
1300 self.assertEquals(b"ef", read_func(bufio, 2))
1301 self.assertEquals(6, bufio.tell())
1302 bufio.flush()
1303 self.assertEquals(6, bufio.tell())
1304 self.assertEquals(b"ghi", read_func(bufio))
1305 raw.seek(0, 0)
1306 raw.write(b"XYZ")
1307 # flush() resets the read buffer
1308 bufio.flush()
1309 bufio.seek(0, 0)
1310 self.assertEquals(b"XYZ", read_func(bufio, 3))
1311
1312 def test_flush_and_read(self):
1313 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1314
1315 def test_flush_and_readinto(self):
1316 def _readinto(bufio, n=-1):
1317 b = bytearray(n if n >= 0 else 9999)
1318 n = bufio.readinto(b)
1319 return bytes(b[:n])
1320 self.check_flush_and_read(_readinto)
1321
1322 def test_flush_and_peek(self):
1323 def _peek(bufio, n=-1):
1324 # This relies on the fact that the buffer can contain the whole
1325 # raw stream, otherwise peek() can return less.
1326 b = bufio.peek(n)
1327 if n != -1:
1328 b = b[:n]
1329 bufio.seek(len(b), 1)
1330 return b
1331 self.check_flush_and_read(_peek)
1332
1333 def test_flush_and_write(self):
1334 raw = self.BytesIO(b"abcdefghi")
1335 bufio = self.tp(raw)
1336
1337 bufio.write(b"123")
1338 bufio.flush()
1339 bufio.write(b"45")
1340 bufio.flush()
1341 bufio.seek(0, 0)
1342 self.assertEquals(b"12345fghi", raw.getvalue())
1343 self.assertEquals(b"12345fghi", bufio.read())
1344
1345 def test_threads(self):
1346 BufferedReaderTest.test_threads(self)
1347 BufferedWriterTest.test_threads(self)
1348
1349 def test_writes_and_peek(self):
1350 def _peek(bufio):
1351 bufio.peek(1)
1352 self.check_writes(_peek)
1353 def _peek(bufio):
1354 pos = bufio.tell()
1355 bufio.seek(-1, 1)
1356 bufio.peek(1)
1357 bufio.seek(pos, 0)
1358 self.check_writes(_peek)
1359
1360 def test_writes_and_reads(self):
1361 def _read(bufio):
1362 bufio.seek(-1, 1)
1363 bufio.read(1)
1364 self.check_writes(_read)
1365
1366 def test_writes_and_read1s(self):
1367 def _read1(bufio):
1368 bufio.seek(-1, 1)
1369 bufio.read1(1)
1370 self.check_writes(_read1)
1371
1372 def test_writes_and_readintos(self):
1373 def _read(bufio):
1374 bufio.seek(-1, 1)
1375 bufio.readinto(bytearray(1))
1376 self.check_writes(_read)
1377
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001378 def test_write_after_readahead(self):
1379 # Issue #6629: writing after the buffer was filled by readahead should
1380 # first rewind the raw stream.
1381 for overwrite_size in [1, 5]:
1382 raw = self.BytesIO(b"A" * 10)
1383 bufio = self.tp(raw, 4)
1384 # Trigger readahead
1385 self.assertEqual(bufio.read(1), b"A")
1386 self.assertEqual(bufio.tell(), 1)
1387 # Overwriting should rewind the raw stream if it needs so
1388 bufio.write(b"B" * overwrite_size)
1389 self.assertEqual(bufio.tell(), overwrite_size + 1)
1390 # If the write size was smaller than the buffer size, flush() and
1391 # check that rewind happens.
1392 bufio.flush()
1393 self.assertEqual(bufio.tell(), overwrite_size + 1)
1394 s = raw.getvalue()
1395 self.assertEqual(s,
1396 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1397
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001398 def test_truncate_after_read_or_write(self):
1399 raw = self.BytesIO(b"A" * 10)
1400 bufio = self.tp(raw, 100)
1401 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1402 self.assertEqual(bufio.truncate(), 2)
1403 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1404 self.assertEqual(bufio.truncate(), 4)
1405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406 def test_misbehaved_io(self):
1407 BufferedReaderTest.test_misbehaved_io(self)
1408 BufferedWriterTest.test_misbehaved_io(self)
1409
1410class CBufferedRandomTest(BufferedRandomTest):
1411 tp = io.BufferedRandom
1412
1413 def test_constructor(self):
1414 BufferedRandomTest.test_constructor(self)
1415 # The allocation can succeed on 32-bit builds, e.g. with more
1416 # than 2GB RAM and a 64-bit kernel.
1417 if sys.maxsize > 0x7FFFFFFF:
1418 rawio = self.MockRawIO()
1419 bufio = self.tp(rawio)
1420 self.assertRaises((OverflowError, MemoryError, ValueError),
1421 bufio.__init__, rawio, sys.maxsize)
1422
1423 def test_garbage_collection(self):
1424 CBufferedReaderTest.test_garbage_collection(self)
1425 CBufferedWriterTest.test_garbage_collection(self)
1426
1427class PyBufferedRandomTest(BufferedRandomTest):
1428 tp = pyio.BufferedRandom
1429
1430
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001431# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1432# properties:
1433# - A single output character can correspond to many bytes of input.
1434# - The number of input bytes to complete the character can be
1435# undetermined until the last input byte is received.
1436# - The number of input bytes can vary depending on previous input.
1437# - A single input byte can correspond to many characters of output.
1438# - The number of output characters can be undetermined until the
1439# last input byte is received.
1440# - The number of output characters can vary depending on previous input.
1441
1442class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1443 """
1444 For testing seek/tell behavior with a stateful, buffering decoder.
1445
1446 Input is a sequence of words. Words may be fixed-length (length set
1447 by input) or variable-length (period-terminated). In variable-length
1448 mode, extra periods are ignored. Possible words are:
1449 - 'i' followed by a number sets the input length, I (maximum 99).
1450 When I is set to 0, words are space-terminated.
1451 - 'o' followed by a number sets the output length, O (maximum 99).
1452 - Any other word is converted into a word followed by a period on
1453 the output. The output word consists of the input word truncated
1454 or padded out with hyphens to make its length equal to O. If O
1455 is 0, the word is output verbatim without truncating or padding.
1456 I and O are initially set to 1. When I changes, any buffered input is
1457 re-scanned according to the new I. EOF also terminates the last word.
1458 """
1459
1460 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001461 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001462 self.reset()
1463
1464 def __repr__(self):
1465 return '<SID %x>' % id(self)
1466
1467 def reset(self):
1468 self.i = 1
1469 self.o = 1
1470 self.buffer = bytearray()
1471
1472 def getstate(self):
1473 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1474 return bytes(self.buffer), i*100 + o
1475
1476 def setstate(self, state):
1477 buffer, io = state
1478 self.buffer = bytearray(buffer)
1479 i, o = divmod(io, 100)
1480 self.i, self.o = i ^ 1, o ^ 1
1481
1482 def decode(self, input, final=False):
1483 output = ''
1484 for b in input:
1485 if self.i == 0: # variable-length, terminated with period
1486 if b == ord('.'):
1487 if self.buffer:
1488 output += self.process_word()
1489 else:
1490 self.buffer.append(b)
1491 else: # fixed-length, terminate after self.i bytes
1492 self.buffer.append(b)
1493 if len(self.buffer) == self.i:
1494 output += self.process_word()
1495 if final and self.buffer: # EOF terminates the last word
1496 output += self.process_word()
1497 return output
1498
1499 def process_word(self):
1500 output = ''
1501 if self.buffer[0] == ord('i'):
1502 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1503 elif self.buffer[0] == ord('o'):
1504 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1505 else:
1506 output = self.buffer.decode('ascii')
1507 if len(output) < self.o:
1508 output += '-'*self.o # pad out with hyphens
1509 if self.o:
1510 output = output[:self.o] # truncate to output length
1511 output += '.'
1512 self.buffer = bytearray()
1513 return output
1514
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001515 codecEnabled = False
1516
1517 @classmethod
1518 def lookupTestDecoder(cls, name):
1519 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001520 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001521 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001522 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001523 incrementalencoder=None,
1524 streamreader=None, streamwriter=None,
1525 incrementaldecoder=cls)
1526
1527# Register the previous decoder for testing.
1528# Disabled by default, tests will enable it.
1529codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1530
1531
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001532class StatefulIncrementalDecoderTest(unittest.TestCase):
1533 """
1534 Make sure the StatefulIncrementalDecoder actually works.
1535 """
1536
1537 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001538 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001539 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001540 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001541 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001542 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001543 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001544 # I=0, O=6 (variable-length input, fixed-length output)
1545 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1546 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001547 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001548 # I=6, O=3 (fixed-length input > fixed-length output)
1549 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1550 # I=0, then 3; O=29, then 15 (with longer output)
1551 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1552 'a----------------------------.' +
1553 'b----------------------------.' +
1554 'cde--------------------------.' +
1555 'abcdefghijabcde.' +
1556 'a.b------------.' +
1557 '.c.------------.' +
1558 'd.e------------.' +
1559 'k--------------.' +
1560 'l--------------.' +
1561 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001562 ]
1563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001564 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001565 # Try a few one-shot test cases.
1566 for input, eof, output in self.test_cases:
1567 d = StatefulIncrementalDecoder()
1568 self.assertEquals(d.decode(input, eof), output)
1569
1570 # Also test an unfinished decode, followed by forcing EOF.
1571 d = StatefulIncrementalDecoder()
1572 self.assertEquals(d.decode(b'oiabcd'), '')
1573 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001574
1575class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001576
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001577 def setUp(self):
1578 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1579 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001580 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001581
Guido van Rossumd0712812007-04-11 16:32:43 +00001582 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001583 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 def test_constructor(self):
1586 r = self.BytesIO(b"\xc3\xa9\n\n")
1587 b = self.BufferedReader(r, 1000)
1588 t = self.TextIOWrapper(b)
1589 t.__init__(b, encoding="latin1", newline="\r\n")
1590 self.assertEquals(t.encoding, "latin1")
1591 self.assertEquals(t.line_buffering, False)
1592 t.__init__(b, encoding="utf8", line_buffering=True)
1593 self.assertEquals(t.encoding, "utf8")
1594 self.assertEquals(t.line_buffering, True)
1595 self.assertEquals("\xe9\n", t.readline())
1596 self.assertRaises(TypeError, t.__init__, b, newline=42)
1597 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1598
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001599 def test_detach(self):
1600 r = self.BytesIO()
1601 b = self.BufferedWriter(r)
1602 t = self.TextIOWrapper(b)
1603 self.assertIs(t.detach(), b)
1604
1605 t = self.TextIOWrapper(b, encoding="ascii")
1606 t.write("howdy")
1607 self.assertFalse(r.getvalue())
1608 t.detach()
1609 self.assertEqual(r.getvalue(), b"howdy")
1610 self.assertRaises(ValueError, t.detach)
1611
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001612 def test_repr(self):
1613 raw = self.BytesIO("hello".encode("utf-8"))
1614 b = self.BufferedReader(raw)
1615 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001616 modname = self.TextIOWrapper.__module__
1617 self.assertEqual(repr(t),
1618 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1619 raw.name = "dummy"
1620 self.assertEqual(repr(t),
1621 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1622 raw.name = b"dummy"
1623 self.assertEqual(repr(t),
1624 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 def test_line_buffering(self):
1627 r = self.BytesIO()
1628 b = self.BufferedWriter(r, 1000)
1629 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001630 t.write("X")
1631 self.assertEquals(r.getvalue(), b"") # No flush happened
1632 t.write("Y\nZ")
1633 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1634 t.write("A\rB")
1635 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 def test_encoding(self):
1638 # Check the encoding attribute is always set, and valid
1639 b = self.BytesIO()
1640 t = self.TextIOWrapper(b, encoding="utf8")
1641 self.assertEqual(t.encoding, "utf8")
1642 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001643 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 codecs.lookup(t.encoding)
1645
1646 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001647 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 b = self.BytesIO(b"abc\n\xff\n")
1649 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001650 self.assertRaises(UnicodeError, t.read)
1651 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 b = self.BytesIO(b"abc\n\xff\n")
1653 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001654 self.assertRaises(UnicodeError, t.read)
1655 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 b = self.BytesIO(b"abc\n\xff\n")
1657 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001658 self.assertEquals(t.read(), "abc\n\n")
1659 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 b = self.BytesIO(b"abc\n\xff\n")
1661 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001662 self.assertEquals(t.read(), "abc\n\ufffd\n")
1663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001665 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 b = self.BytesIO()
1667 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001668 self.assertRaises(UnicodeError, t.write, "\xff")
1669 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 b = self.BytesIO()
1671 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001672 self.assertRaises(UnicodeError, t.write, "\xff")
1673 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 b = self.BytesIO()
1675 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001676 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001677 t.write("abc\xffdef\n")
1678 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001679 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001680 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 b = self.BytesIO()
1682 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001683 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001684 t.write("abc\xffdef\n")
1685 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001686 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001689 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1690
1691 tests = [
1692 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001693 [ '', input_lines ],
1694 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1695 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1696 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001697 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001698 encodings = (
1699 'utf-8', 'latin-1',
1700 'utf-16', 'utf-16-le', 'utf-16-be',
1701 'utf-32', 'utf-32-le', 'utf-32-be',
1702 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001703
Guido van Rossum8358db22007-08-18 21:39:55 +00001704 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001705 # character in TextIOWrapper._pending_line.
1706 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001707 # XXX: str.encode() should return bytes
1708 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001709 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001710 for bufsize in range(1, 10):
1711 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1713 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001714 encoding=encoding)
1715 if do_reads:
1716 got_lines = []
1717 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001718 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001719 if c2 == '':
1720 break
1721 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001722 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001723 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001724 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001725
1726 for got_line, exp_line in zip(got_lines, exp_lines):
1727 self.assertEquals(got_line, exp_line)
1728 self.assertEquals(len(got_lines), len(exp_lines))
1729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 def test_newlines_input(self):
1731 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001732 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1733 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001734 (None, normalized.decode("ascii").splitlines(True)),
1735 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1737 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1738 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001739 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 buf = self.BytesIO(testdata)
1741 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001742 self.assertEquals(txt.readlines(), expected)
1743 txt.seek(0)
1744 self.assertEquals(txt.read(), "".join(expected))
1745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 def test_newlines_output(self):
1747 testdict = {
1748 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1749 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1750 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1751 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1752 }
1753 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1754 for newline, expected in tests:
1755 buf = self.BytesIO()
1756 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1757 txt.write("AAA\nB")
1758 txt.write("BB\nCCC\n")
1759 txt.write("X\rY\r\nZ")
1760 txt.flush()
1761 self.assertEquals(buf.closed, False)
1762 self.assertEquals(buf.getvalue(), expected)
1763
1764 def test_destructor(self):
1765 l = []
1766 base = self.BytesIO
1767 class MyBytesIO(base):
1768 def close(self):
1769 l.append(self.getvalue())
1770 base.close(self)
1771 b = MyBytesIO()
1772 t = self.TextIOWrapper(b, encoding="ascii")
1773 t.write("abc")
1774 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001775 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 self.assertEquals([b"abc"], l)
1777
1778 def test_override_destructor(self):
1779 record = []
1780 class MyTextIO(self.TextIOWrapper):
1781 def __del__(self):
1782 record.append(1)
1783 try:
1784 f = super().__del__
1785 except AttributeError:
1786 pass
1787 else:
1788 f()
1789 def close(self):
1790 record.append(2)
1791 super().close()
1792 def flush(self):
1793 record.append(3)
1794 super().flush()
1795 b = self.BytesIO()
1796 t = MyTextIO(b, encoding="ascii")
1797 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001798 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 self.assertEqual(record, [1, 2, 3])
1800
1801 def test_error_through_destructor(self):
1802 # Test that the exception state is not modified by a destructor,
1803 # even if close() fails.
1804 rawio = self.CloseFailureIO()
1805 def f():
1806 self.TextIOWrapper(rawio).xyzzy
1807 with support.captured_output("stderr") as s:
1808 self.assertRaises(AttributeError, f)
1809 s = s.getvalue().strip()
1810 if s:
1811 # The destructor *may* have printed an unraisable error, check it
1812 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001813 self.assertTrue(s.startswith("Exception IOError: "), s)
1814 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001815
Guido van Rossum9b76da62007-04-11 01:09:03 +00001816 # Systematic tests of the text I/O API
1817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001818 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001819 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1820 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001822 f._CHUNK_SIZE = chunksize
1823 self.assertEquals(f.write("abc"), 3)
1824 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001826 f._CHUNK_SIZE = chunksize
1827 self.assertEquals(f.tell(), 0)
1828 self.assertEquals(f.read(), "abc")
1829 cookie = f.tell()
1830 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001831 self.assertEquals(f.read(None), "abc")
1832 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001833 self.assertEquals(f.read(2), "ab")
1834 self.assertEquals(f.read(1), "c")
1835 self.assertEquals(f.read(1), "")
1836 self.assertEquals(f.read(), "")
1837 self.assertEquals(f.tell(), cookie)
1838 self.assertEquals(f.seek(0), 0)
1839 self.assertEquals(f.seek(0, 2), cookie)
1840 self.assertEquals(f.write("def"), 3)
1841 self.assertEquals(f.seek(cookie), cookie)
1842 self.assertEquals(f.read(), "def")
1843 if enc.startswith("utf"):
1844 self.multi_line_test(f, enc)
1845 f.close()
1846
1847 def multi_line_test(self, f, enc):
1848 f.seek(0)
1849 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001850 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001851 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001852 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001853 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001854 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001855 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001856 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001857 wlines.append((f.tell(), line))
1858 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001859 f.seek(0)
1860 rlines = []
1861 while True:
1862 pos = f.tell()
1863 line = f.readline()
1864 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001865 break
1866 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001867 self.assertEquals(rlines, wlines)
1868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 def test_telling(self):
1870 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001871 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001872 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001873 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001874 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001875 p2 = f.tell()
1876 f.seek(0)
1877 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001878 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001879 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001880 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001881 self.assertEquals(f.tell(), p2)
1882 f.seek(0)
1883 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001884 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001885 self.assertRaises(IOError, f.tell)
1886 self.assertEquals(f.tell(), p2)
1887 f.close()
1888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 def test_seeking(self):
1890 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001891 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001892 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001893 prefix = bytes(u_prefix.encode("utf-8"))
1894 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001895 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001896 suffix = bytes(u_suffix.encode("utf-8"))
1897 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001899 f.write(line*2)
1900 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001901 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001902 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001903 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001904 self.assertEquals(f.tell(), prefix_size)
1905 self.assertEquals(f.readline(), u_suffix)
1906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001908 # Regression test for a specific bug
1909 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001911 f.write(data)
1912 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001914 f._CHUNK_SIZE # Just test that it exists
1915 f._CHUNK_SIZE = 2
1916 f.readline()
1917 f.tell()
1918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001919 def test_seek_and_tell(self):
1920 #Test seek/tell using the StatefulIncrementalDecoder.
1921 # Make test faster by doing smaller seeks
1922 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001923
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001924 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001925 """Tell/seek to various points within a data stream and ensure
1926 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001927 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001928 f.write(data)
1929 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 f = self.open(support.TESTFN, encoding='test_decoder')
1931 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001932 decoded = f.read()
1933 f.close()
1934
Neal Norwitze2b07052008-03-18 19:52:05 +00001935 for i in range(min_pos, len(decoded) + 1): # seek positions
1936 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001938 self.assertEquals(f.read(i), decoded[:i])
1939 cookie = f.tell()
1940 self.assertEquals(f.read(j), decoded[i:i + j])
1941 f.seek(cookie)
1942 self.assertEquals(f.read(), decoded[i:])
1943 f.close()
1944
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001945 # Enable the test decoder.
1946 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001947
1948 # Run the tests.
1949 try:
1950 # Try each test case.
1951 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001952 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001953
1954 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001955 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1956 offset = CHUNK_SIZE - len(input)//2
1957 prefix = b'.'*offset
1958 # Don't bother seeking into the prefix (takes too long).
1959 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001960 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001961
1962 # Ensure our test decoder won't interfere with subsequent tests.
1963 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001964 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001967 data = "1234567890"
1968 tests = ("utf-16",
1969 "utf-16-le",
1970 "utf-16-be",
1971 "utf-32",
1972 "utf-32-le",
1973 "utf-32-be")
1974 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 buf = self.BytesIO()
1976 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001977 # Check if the BOM is written only once (see issue1753).
1978 f.write(data)
1979 f.write(data)
1980 f.seek(0)
1981 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001982 f.seek(0)
1983 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001984 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1985
Benjamin Petersona1b49012009-03-31 23:11:32 +00001986 def test_unreadable(self):
1987 class UnReadable(self.BytesIO):
1988 def readable(self):
1989 return False
1990 txt = self.TextIOWrapper(UnReadable())
1991 self.assertRaises(IOError, txt.read)
1992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_read_one_by_one(self):
1994 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001995 reads = ""
1996 while True:
1997 c = txt.read(1)
1998 if not c:
1999 break
2000 reads += c
2001 self.assertEquals(reads, "AA\nBB")
2002
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002003 def test_readlines(self):
2004 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2005 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2006 txt.seek(0)
2007 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2008 txt.seek(0)
2009 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2010
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002011 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002013 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002015 reads = ""
2016 while True:
2017 c = txt.read(128)
2018 if not c:
2019 break
2020 reads += c
2021 self.assertEquals(reads, "A"*127+"\nB")
2022
2023 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002025
2026 # read one char at a time
2027 reads = ""
2028 while True:
2029 c = txt.read(1)
2030 if not c:
2031 break
2032 reads += c
2033 self.assertEquals(reads, self.normalized)
2034
2035 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002037 txt._CHUNK_SIZE = 4
2038
2039 reads = ""
2040 while True:
2041 c = txt.read(4)
2042 if not c:
2043 break
2044 reads += c
2045 self.assertEquals(reads, self.normalized)
2046
2047 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002048 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002049 txt._CHUNK_SIZE = 4
2050
2051 reads = txt.read(4)
2052 reads += txt.read(4)
2053 reads += txt.readline()
2054 reads += txt.readline()
2055 reads += txt.readline()
2056 self.assertEquals(reads, self.normalized)
2057
2058 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002060 txt._CHUNK_SIZE = 4
2061
2062 reads = txt.read(4)
2063 reads += txt.read()
2064 self.assertEquals(reads, self.normalized)
2065
2066 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068 txt._CHUNK_SIZE = 4
2069
2070 reads = txt.read(4)
2071 pos = txt.tell()
2072 txt.seek(0)
2073 txt.seek(pos)
2074 self.assertEquals(txt.read(4), "BBB\n")
2075
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002076 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077 buffer = self.BytesIO(self.testdata)
2078 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002079
2080 self.assertEqual(buffer.seekable(), txt.seekable())
2081
Antoine Pitroue4501852009-05-14 18:55:55 +00002082 def test_append_bom(self):
2083 # The BOM is not written again when appending to a non-empty file
2084 filename = support.TESTFN
2085 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2086 with self.open(filename, 'w', encoding=charset) as f:
2087 f.write('aaa')
2088 pos = f.tell()
2089 with self.open(filename, 'rb') as f:
2090 self.assertEquals(f.read(), 'aaa'.encode(charset))
2091
2092 with self.open(filename, 'a', encoding=charset) as f:
2093 f.write('xxx')
2094 with self.open(filename, 'rb') as f:
2095 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2096
2097 def test_seek_bom(self):
2098 # Same test, but when seeking manually
2099 filename = support.TESTFN
2100 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2101 with self.open(filename, 'w', encoding=charset) as f:
2102 f.write('aaa')
2103 pos = f.tell()
2104 with self.open(filename, 'r+', encoding=charset) as f:
2105 f.seek(pos)
2106 f.write('zzz')
2107 f.seek(0)
2108 f.write('bbb')
2109 with self.open(filename, 'rb') as f:
2110 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2111
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002112 def test_errors_property(self):
2113 with self.open(support.TESTFN, "w") as f:
2114 self.assertEqual(f.errors, "strict")
2115 with self.open(support.TESTFN, "w", errors="replace") as f:
2116 self.assertEqual(f.errors, "replace")
2117
Victor Stinner45df8202010-04-28 22:31:17 +00002118 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002119 def test_threads_write(self):
2120 # Issue6750: concurrent writes could duplicate data
2121 event = threading.Event()
2122 with self.open(support.TESTFN, "w", buffering=1) as f:
2123 def run(n):
2124 text = "Thread%03d\n" % n
2125 event.wait()
2126 f.write(text)
2127 threads = [threading.Thread(target=lambda n=x: run(n))
2128 for x in range(20)]
2129 for t in threads:
2130 t.start()
2131 time.sleep(0.02)
2132 event.set()
2133 for t in threads:
2134 t.join()
2135 with self.open(support.TESTFN) as f:
2136 content = f.read()
2137 for n in range(20):
2138 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2139
Antoine Pitrou6be88762010-05-03 16:48:20 +00002140 def test_flush_error_on_close(self):
2141 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2142 def bad_flush():
2143 raise IOError()
2144 txt.flush = bad_flush
2145 self.assertRaises(IOError, txt.close) # exception not swallowed
2146
2147 def test_multi_close(self):
2148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2149 txt.close()
2150 txt.close()
2151 txt.close()
2152 self.assertRaises(ValueError, txt.flush)
2153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154class CTextIOWrapperTest(TextIOWrapperTest):
2155
2156 def test_initialization(self):
2157 r = self.BytesIO(b"\xc3\xa9\n\n")
2158 b = self.BufferedReader(r, 1000)
2159 t = self.TextIOWrapper(b)
2160 self.assertRaises(TypeError, t.__init__, b, newline=42)
2161 self.assertRaises(ValueError, t.read)
2162 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2163 self.assertRaises(ValueError, t.read)
2164
2165 def test_garbage_collection(self):
2166 # C TextIOWrapper objects are collected, and collecting them flushes
2167 # all data to disk.
2168 # The Python version has __del__, so it ends in gc.garbage instead.
2169 rawio = io.FileIO(support.TESTFN, "wb")
2170 b = self.BufferedWriter(rawio)
2171 t = self.TextIOWrapper(b, encoding="ascii")
2172 t.write("456def")
2173 t.x = t
2174 wr = weakref.ref(t)
2175 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002176 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002177 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002178 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002179 self.assertEqual(f.read(), b"456def")
2180
2181class PyTextIOWrapperTest(TextIOWrapperTest):
2182 pass
2183
2184
2185class IncrementalNewlineDecoderTest(unittest.TestCase):
2186
2187 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002188 # UTF-8 specific tests for a newline decoder
2189 def _check_decode(b, s, **kwargs):
2190 # We exercise getstate() / setstate() as well as decode()
2191 state = decoder.getstate()
2192 self.assertEquals(decoder.decode(b, **kwargs), s)
2193 decoder.setstate(state)
2194 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002195
Antoine Pitrou180a3362008-12-14 16:36:46 +00002196 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002197
Antoine Pitrou180a3362008-12-14 16:36:46 +00002198 _check_decode(b'\xe8', "")
2199 _check_decode(b'\xa2', "")
2200 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002201
Antoine Pitrou180a3362008-12-14 16:36:46 +00002202 _check_decode(b'\xe8', "")
2203 _check_decode(b'\xa2', "")
2204 _check_decode(b'\x88', "\u8888")
2205
2206 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002207 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2208
Antoine Pitrou180a3362008-12-14 16:36:46 +00002209 decoder.reset()
2210 _check_decode(b'\n', "\n")
2211 _check_decode(b'\r', "")
2212 _check_decode(b'', "\n", final=True)
2213 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002214
Antoine Pitrou180a3362008-12-14 16:36:46 +00002215 _check_decode(b'\r', "")
2216 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002217
Antoine Pitrou180a3362008-12-14 16:36:46 +00002218 _check_decode(b'\r\r\n', "\n\n")
2219 _check_decode(b'\r', "")
2220 _check_decode(b'\r', "\n")
2221 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002222
Antoine Pitrou180a3362008-12-14 16:36:46 +00002223 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2224 _check_decode(b'\xe8\xa2\x88', "\u8888")
2225 _check_decode(b'\n', "\n")
2226 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2227 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002229 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002230 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 if encoding is not None:
2232 encoder = codecs.getincrementalencoder(encoding)()
2233 def _decode_bytewise(s):
2234 # Decode one byte at a time
2235 for b in encoder.encode(s):
2236 result.append(decoder.decode(bytes([b])))
2237 else:
2238 encoder = None
2239 def _decode_bytewise(s):
2240 # Decode one char at a time
2241 for c in s:
2242 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002243 self.assertEquals(decoder.newlines, None)
2244 _decode_bytewise("abc\n\r")
2245 self.assertEquals(decoder.newlines, '\n')
2246 _decode_bytewise("\nabc")
2247 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2248 _decode_bytewise("abc\r")
2249 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2250 _decode_bytewise("abc")
2251 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2252 _decode_bytewise("abc\r")
2253 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2254 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 input = "abc"
2256 if encoder is not None:
2257 encoder.reset()
2258 input = encoder.encode(input)
2259 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002260 self.assertEquals(decoder.newlines, None)
2261
2262 def test_newline_decoder(self):
2263 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002264 # None meaning the IncrementalNewlineDecoder takes unicode input
2265 # rather than bytes input
2266 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002267 'utf-16', 'utf-16-le', 'utf-16-be',
2268 'utf-32', 'utf-32-le', 'utf-32-be',
2269 )
2270 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 decoder = enc and codecs.getincrementaldecoder(enc)()
2272 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2273 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002274 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002275 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2276 self.check_newline_decoding_utf8(decoder)
2277
Antoine Pitrou66913e22009-03-06 23:40:56 +00002278 def test_newline_bytes(self):
2279 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2280 def _check(dec):
2281 self.assertEquals(dec.newlines, None)
2282 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2283 self.assertEquals(dec.newlines, None)
2284 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2285 self.assertEquals(dec.newlines, None)
2286 dec = self.IncrementalNewlineDecoder(None, translate=False)
2287 _check(dec)
2288 dec = self.IncrementalNewlineDecoder(None, translate=True)
2289 _check(dec)
2290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2292 pass
2293
2294class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2295 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002296
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002297
Guido van Rossum01a27522007-03-07 01:00:12 +00002298# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002299
Guido van Rossum5abbf752007-08-27 17:39:33 +00002300class MiscIOTest(unittest.TestCase):
2301
Barry Warsaw40e82462008-11-20 20:14:50 +00002302 def tearDown(self):
2303 support.unlink(support.TESTFN)
2304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 def test___all__(self):
2306 for name in self.io.__all__:
2307 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002308 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002309 if name == "open":
2310 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002311 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002312 self.assertTrue(issubclass(obj, Exception), name)
2313 elif not name.startswith("SEEK_"):
2314 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002315
Barry Warsaw40e82462008-11-20 20:14:50 +00002316 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002317 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002318 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002319 f.close()
2320
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002322 self.assertEquals(f.name, support.TESTFN)
2323 self.assertEquals(f.buffer.name, support.TESTFN)
2324 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2325 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002326 self.assertEquals(f.buffer.mode, "rb")
2327 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002328 f.close()
2329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002331 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002332 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2333 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002336 self.assertEquals(g.mode, "wb")
2337 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002338 self.assertEquals(g.name, f.fileno())
2339 self.assertEquals(g.raw.name, f.fileno())
2340 f.close()
2341 g.close()
2342
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002343 def test_io_after_close(self):
2344 for kwargs in [
2345 {"mode": "w"},
2346 {"mode": "wb"},
2347 {"mode": "w", "buffering": 1},
2348 {"mode": "w", "buffering": 2},
2349 {"mode": "wb", "buffering": 0},
2350 {"mode": "r"},
2351 {"mode": "rb"},
2352 {"mode": "r", "buffering": 1},
2353 {"mode": "r", "buffering": 2},
2354 {"mode": "rb", "buffering": 0},
2355 {"mode": "w+"},
2356 {"mode": "w+b"},
2357 {"mode": "w+", "buffering": 1},
2358 {"mode": "w+", "buffering": 2},
2359 {"mode": "w+b", "buffering": 0},
2360 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002362 f.close()
2363 self.assertRaises(ValueError, f.flush)
2364 self.assertRaises(ValueError, f.fileno)
2365 self.assertRaises(ValueError, f.isatty)
2366 self.assertRaises(ValueError, f.__iter__)
2367 if hasattr(f, "peek"):
2368 self.assertRaises(ValueError, f.peek, 1)
2369 self.assertRaises(ValueError, f.read)
2370 if hasattr(f, "read1"):
2371 self.assertRaises(ValueError, f.read1, 1024)
2372 if hasattr(f, "readinto"):
2373 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2374 self.assertRaises(ValueError, f.readline)
2375 self.assertRaises(ValueError, f.readlines)
2376 self.assertRaises(ValueError, f.seek, 0)
2377 self.assertRaises(ValueError, f.tell)
2378 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002379 self.assertRaises(ValueError, f.write,
2380 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002381 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 def test_blockingioerror(self):
2385 # Various BlockingIOError issues
2386 self.assertRaises(TypeError, self.BlockingIOError)
2387 self.assertRaises(TypeError, self.BlockingIOError, 1)
2388 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2389 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2390 b = self.BlockingIOError(1, "")
2391 self.assertEqual(b.characters_written, 0)
2392 class C(str):
2393 pass
2394 c = C("")
2395 b = self.BlockingIOError(1, c)
2396 c.b = b
2397 b.c = c
2398 wr = weakref.ref(c)
2399 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002400 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002401 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402
2403 def test_abcs(self):
2404 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002405 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2406 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2407 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2408 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409
2410 def _check_abc_inheritance(self, abcmodule):
2411 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002412 self.assertIsInstance(f, abcmodule.IOBase)
2413 self.assertIsInstance(f, abcmodule.RawIOBase)
2414 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2415 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002417 self.assertIsInstance(f, abcmodule.IOBase)
2418 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2419 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2420 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002421 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002422 self.assertIsInstance(f, abcmodule.IOBase)
2423 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2424 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2425 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002426
2427 def test_abc_inheritance(self):
2428 # Test implementations inherit from their respective ABCs
2429 self._check_abc_inheritance(self)
2430
2431 def test_abc_inheritance_official(self):
2432 # Test implementations inherit from the official ABCs of the
2433 # baseline "io" module.
2434 self._check_abc_inheritance(io)
2435
2436class CMiscIOTest(MiscIOTest):
2437 io = io
2438
2439class PyMiscIOTest(MiscIOTest):
2440 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002441
Guido van Rossum28524c72007-02-27 05:47:44 +00002442def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002443 tests = (CIOTest, PyIOTest,
2444 CBufferedReaderTest, PyBufferedReaderTest,
2445 CBufferedWriterTest, PyBufferedWriterTest,
2446 CBufferedRWPairTest, PyBufferedRWPairTest,
2447 CBufferedRandomTest, PyBufferedRandomTest,
2448 StatefulIncrementalDecoderTest,
2449 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2450 CTextIOWrapperTest, PyTextIOWrapperTest,
2451 CMiscIOTest, PyMiscIOTest,)
2452
2453 # Put the namespaces of the IO module we are testing and some useful mock
2454 # classes in the __dict__ of each test.
2455 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2456 MockNonBlockWriterIO)
2457 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2458 c_io_ns = {name : getattr(io, name) for name in all_members}
2459 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2460 globs = globals()
2461 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2462 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2463 # Avoid turning open into a bound method.
2464 py_io_ns["open"] = pyio.OpenWrapper
2465 for test in tests:
2466 if test.__name__.startswith("C"):
2467 for name, obj in c_io_ns.items():
2468 setattr(test, name, obj)
2469 elif test.__name__.startswith("Py"):
2470 for name, obj in py_io_ns.items():
2471 setattr(test, name, obj)
2472
2473 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002474
2475if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 test_main()