blob: fa96f9a600936cf8d3074686744dc38a011b59a2 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000030from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000031from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000033
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000034import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035import io # C implementation of io
36import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000037try:
38 import threading
39except ImportError:
40 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000041
Guido van Rossuma9e20242007-03-08 00:43:48 +000042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043def _default_chunk_size():
44 """Get the default TextIOWrapper chunk size"""
45 with open(__file__, "r", encoding="latin1") as f:
46 return f._CHUNK_SIZE
47
48
49class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000051 def __init__(self, read_stack=()):
52 self._read_stack = list(read_stack)
53 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000055
56 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000059 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000061 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
91 return 0
92 if data is None:
93 del self._read_stack[0]
94 return None
95 n = len(data)
96 if len(data) <= max_len:
97 del self._read_stack[0]
98 buf[:n] = data
99 return n
100 else:
101 buf[:] = data[:max_len]
102 self._read_stack[0] = data[max_len:]
103 return max_len
104
105 def truncate(self, pos=None):
106 return pos
107
108class CMockRawIO(MockRawIO, io.RawIOBase):
109 pass
110
111class PyMockRawIO(MockRawIO, pyio.RawIOBase):
112 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000113
Guido van Rossuma9e20242007-03-08 00:43:48 +0000114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000115class MisbehavedRawIO(MockRawIO):
116 def write(self, b):
117 return super().write(b) * 2
118
119 def read(self, n=None):
120 return super().read(n) * 2
121
122 def seek(self, pos, whence):
123 return -123
124
125 def tell(self):
126 return -456
127
128 def readinto(self, buf):
129 super().readinto(buf)
130 return len(buf) * 5
131
132class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
133 pass
134
135class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
136 pass
137
138
139class CloseFailureIO(MockRawIO):
140 closed = 0
141
142 def close(self):
143 if not self.closed:
144 self.closed = 1
145 raise IOError
146
147class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
148 pass
149
150class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
151 pass
152
153
154class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000155
156 def __init__(self, data):
157 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000158 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000159
160 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000162 self.read_history.append(None if res is None else len(res))
163 return res
164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165 def readinto(self, b):
166 res = super().readinto(b)
167 self.read_history.append(res)
168 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170class CMockFileIO(MockFileIO, io.BytesIO):
171 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173class PyMockFileIO(MockFileIO, pyio.BytesIO):
174 pass
175
176
177class MockNonBlockWriterIO:
178
179 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000180 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def pop_written(self):
184 s = b"".join(self._write_stack)
185 self._write_stack[:] = []
186 return s
187
188 def block_on(self, char):
189 """Block when a given char is encountered."""
190 self._blocker_char = char
191
192 def readable(self):
193 return True
194
195 def seekable(self):
196 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Guido van Rossum01a27522007-03-07 01:00:12 +0000198 def writable(self):
199 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201 def write(self, b):
202 b = bytes(b)
203 n = -1
204 if self._blocker_char:
205 try:
206 n = b.index(self._blocker_char)
207 except ValueError:
208 pass
209 else:
210 self._blocker_char = None
211 self._write_stack.append(b[:n])
212 raise self.BlockingIOError(0, "test blocking", n)
213 self._write_stack.append(b)
214 return len(b)
215
216class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
217 BlockingIOError = io.BlockingIOError
218
219class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
220 BlockingIOError = pyio.BlockingIOError
221
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Guido van Rossum28524c72007-02-27 05:47:44 +0000223class IOTest(unittest.TestCase):
224
Neal Norwitze7789b12008-03-24 06:18:09 +0000225 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000226 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000227
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000230
Guido van Rossum28524c72007-02-27 05:47:44 +0000231 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000232 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000233 f.truncate(0)
234 self.assertEqual(f.tell(), 5)
235 f.seek(0)
236
237 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
239 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000240 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000241 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000242 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000243 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000244 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000245 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000246 self.assertEqual(f.seek(-1, 2), 13)
247 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000248
Guido van Rossum87429772007-04-10 21:06:59 +0000249 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000250 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000251 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000252
Guido van Rossum9b76da62007-04-11 01:09:03 +0000253 def read_ops(self, f, buffered=False):
254 data = f.read(5)
255 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000256 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000257 self.assertEqual(f.readinto(data), 5)
258 self.assertEqual(data, b" worl")
259 self.assertEqual(f.readinto(data), 2)
260 self.assertEqual(len(data), 5)
261 self.assertEqual(data[:2], b"d\n")
262 self.assertEqual(f.seek(0), 0)
263 self.assertEqual(f.read(20), b"hello world\n")
264 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000265 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000266 self.assertEqual(f.seek(-6, 2), 6)
267 self.assertEqual(f.read(5), b"world")
268 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000269 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000270 self.assertEqual(f.seek(-6, 1), 5)
271 self.assertEqual(f.read(5), b" worl")
272 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000273 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000274 if buffered:
275 f.seek(0)
276 self.assertEqual(f.read(), b"hello world\n")
277 f.seek(6)
278 self.assertEqual(f.read(), b"world\n")
279 self.assertEqual(f.read(), b"")
280
Guido van Rossum34d69e52007-04-10 20:08:41 +0000281 LARGE = 2**31
282
Guido van Rossum53807da2007-04-10 19:01:47 +0000283 def large_file_ops(self, f):
284 assert f.readable()
285 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000286 self.assertEqual(f.seek(self.LARGE), self.LARGE)
287 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000288 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.tell(), self.LARGE + 3)
290 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000292 self.assertEqual(f.tell(), self.LARGE + 2)
293 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
297 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 self.assertEqual(f.read(2), b"x")
299
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000300 def test_invalid_operations(self):
301 # Try writing on a file opened in read mode and vice-versa.
302 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000303 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000304 self.assertRaises(IOError, fp.read)
305 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000306 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000307 self.assertRaises(IOError, fp.write, b"blah")
308 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000309 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000310 self.assertRaises(IOError, fp.write, "blah")
311 self.assertRaises(IOError, fp.writelines, ["blah\n"])
312
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000314 with self.open(support.TESTFN, "wb", buffering=0) as f:
315 self.assertEqual(f.readable(), False)
316 self.assertEqual(f.writable(), True)
317 self.assertEqual(f.seekable(), True)
318 self.write_ops(f)
319 with self.open(support.TESTFN, "rb", buffering=0) as f:
320 self.assertEqual(f.readable(), True)
321 self.assertEqual(f.writable(), False)
322 self.assertEqual(f.seekable(), True)
323 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000324
Guido van Rossum87429772007-04-10 21:06:59 +0000325 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000326 with self.open(support.TESTFN, "wb") as f:
327 self.assertEqual(f.readable(), False)
328 self.assertEqual(f.writable(), True)
329 self.assertEqual(f.seekable(), True)
330 self.write_ops(f)
331 with self.open(support.TESTFN, "rb") as f:
332 self.assertEqual(f.readable(), True)
333 self.assertEqual(f.writable(), False)
334 self.assertEqual(f.seekable(), True)
335 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000336
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000337 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000338 with self.open(support.TESTFN, "wb") as f:
339 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
340 with self.open(support.TESTFN, "rb") as f:
341 self.assertEqual(f.readline(), b"abc\n")
342 self.assertEqual(f.readline(10), b"def\n")
343 self.assertEqual(f.readline(2), b"xy")
344 self.assertEqual(f.readline(4), b"zzy\n")
345 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000346 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000347 self.assertRaises(TypeError, f.readline, 5.3)
348 with self.open(support.TESTFN, "r") as f:
349 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000350
Guido van Rossum28524c72007-02-27 05:47:44 +0000351 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000352 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000353 self.write_ops(f)
354 data = f.getvalue()
355 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000356 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000357 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000358
Guido van Rossum53807da2007-04-10 19:01:47 +0000359 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000360 # On Windows and Mac OSX this test comsumes large resources; It takes
361 # a long time to build the >2GB file and takes >2GB of disk space
362 # therefore the resource must be enabled to run this test.
363 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000364 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000365 print("\nTesting large file ops skipped on %s." % sys.platform,
366 file=sys.stderr)
367 print("It requires %d bytes and a long time." % self.LARGE,
368 file=sys.stderr)
369 print("Use 'regrtest.py -u largefile test_io' to run it.",
370 file=sys.stderr)
371 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000372 with self.open(support.TESTFN, "w+b", 0) as f:
373 self.large_file_ops(f)
374 with self.open(support.TESTFN, "w+b") as f:
375 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000376
377 def test_with_open(self):
378 for bufsize in (0, 1, 100):
379 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000381 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000382 self.assertEqual(f.closed, True)
383 f = None
384 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000385 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000386 1/0
387 except ZeroDivisionError:
388 self.assertEqual(f.closed, True)
389 else:
390 self.fail("1/0 didn't raise an exception")
391
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 # issue 5008
393 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000394 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000399 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000401 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000402
Guido van Rossum87429772007-04-10 21:06:59 +0000403 def test_destructor(self):
404 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000406 def __del__(self):
407 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 try:
409 f = super().__del__
410 except AttributeError:
411 pass
412 else:
413 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000414 def close(self):
415 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000417 def flush(self):
418 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000419 super().flush()
420 f = MyFileIO(support.TESTFN, "wb")
421 f.write(b"xxx")
422 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000423 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000424 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000426 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427
428 def _check_base_destructor(self, base):
429 record = []
430 class MyIO(base):
431 def __init__(self):
432 # This exercises the availability of attributes on object
433 # destruction.
434 # (in the C version, close() is called by the tp_dealloc
435 # function, not by __del__)
436 self.on_del = 1
437 self.on_close = 2
438 self.on_flush = 3
439 def __del__(self):
440 record.append(self.on_del)
441 try:
442 f = super().__del__
443 except AttributeError:
444 pass
445 else:
446 f()
447 def close(self):
448 record.append(self.on_close)
449 super().close()
450 def flush(self):
451 record.append(self.on_flush)
452 super().flush()
453 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000454 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000455 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000456 self.assertEqual(record, [1, 2, 3])
457
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 def test_IOBase_destructor(self):
459 self._check_base_destructor(self.IOBase)
460
461 def test_RawIOBase_destructor(self):
462 self._check_base_destructor(self.RawIOBase)
463
464 def test_BufferedIOBase_destructor(self):
465 self._check_base_destructor(self.BufferedIOBase)
466
467 def test_TextIOBase_destructor(self):
468 self._check_base_destructor(self.TextIOBase)
469
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000471 with self.open(support.TESTFN, "wb") as f:
472 f.write(b"xxx")
473 with self.open(support.TESTFN, "rb") as f:
474 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000475
Guido van Rossumd4103952007-04-12 05:44:49 +0000476 def test_array_writes(self):
477 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000478 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000479 with self.open(support.TESTFN, "wb", 0) as f:
480 self.assertEqual(f.write(a), n)
481 with self.open(support.TESTFN, "wb") as f:
482 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000483
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000484 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000486 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 def test_read_closed(self):
489 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000490 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 with self.open(support.TESTFN, "r") as f:
492 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 self.assertEqual(file.read(), "egg\n")
494 file.seek(0)
495 file.close()
496 self.assertRaises(ValueError, file.read)
497
498 def test_no_closefd_with_filename(self):
499 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501
502 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000504 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000506 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.buffer.raw.closefd, False)
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_garbage_collection(self):
511 # FileIO objects are collected, and collecting them flushes
512 # all data to disk.
513 f = self.FileIO(support.TESTFN, "wb")
514 f.write(b"abcxxx")
515 f.f = f
516 wr = weakref.ref(f)
517 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000518 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000519 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000520 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000521 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000522
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 def test_unbounded_file(self):
524 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
525 zero = "/dev/zero"
526 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000527 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000529 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000531 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000532 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000534 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000536 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000537 self.assertRaises(OverflowError, f.read)
538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class CIOTest(IOTest):
540 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542class PyIOTest(IOTest):
543 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000544
Guido van Rossuma9e20242007-03-08 00:43:48 +0000545
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
548
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
558
559 self.assertEquals(42, bufio.fileno())
560
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
565
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
572
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super().__del__
581 except AttributeError:
582 pass
583 else:
584 f()
585 def close(self):
586 record.append(2)
587 super().close()
588 def flush(self):
589 record.append(3)
590 super().flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000595 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
600
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
612
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000625 self.assertTrue(s.startswith("Exception IOError: "), s)
626 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000627
Antoine Pitrou716c4442009-05-23 19:04:03 +0000628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
637
Guido van Rossum78892e42007-04-06 17:31:18 +0000638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000639class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000657 for arg in (None, 7):
658 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
659 bufio = self.tp(rawio)
660 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661 # Invalid args
662 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000664 def test_read1(self):
665 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
666 bufio = self.tp(rawio)
667 self.assertEquals(b"a", bufio.read(1))
668 self.assertEquals(b"b", bufio.read1(1))
669 self.assertEquals(rawio._reads, 1)
670 self.assertEquals(b"c", bufio.read1(100))
671 self.assertEquals(rawio._reads, 1)
672 self.assertEquals(b"d", bufio.read1(100))
673 self.assertEquals(rawio._reads, 2)
674 self.assertEquals(b"efg", bufio.read1(100))
675 self.assertEquals(rawio._reads, 3)
676 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000677 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678 # Invalid args
679 self.assertRaises(ValueError, bufio.read1, -1)
680
681 def test_readinto(self):
682 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
683 bufio = self.tp(rawio)
684 b = bytearray(2)
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"ab")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"cd")
689 self.assertEquals(bufio.readinto(b), 2)
690 self.assertEquals(b, b"ef")
691 self.assertEquals(bufio.readinto(b), 1)
692 self.assertEquals(b, b"gf")
693 self.assertEquals(bufio.readinto(b), 0)
694 self.assertEquals(b, b"gf")
695
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000696 def test_readlines(self):
697 def bufio():
698 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
699 return self.tp(rawio)
700 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
701 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
702 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000705 data = b"abcdefghi"
706 dlen = len(data)
707
708 tests = [
709 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
710 [ 100, [ 3, 3, 3], [ dlen ] ],
711 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
712 ]
713
714 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 rawio = self.MockFileIO(data)
716 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000717 pos = 0
718 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000719 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000720 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000722 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000725 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
727 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000728
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000729 self.assertEquals(b"abcd", bufio.read(6))
730 self.assertEquals(b"e", bufio.read(1))
731 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000733 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000734 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000735
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 def test_read_past_eof(self):
737 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
738 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000739
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000740 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 def test_read_all(self):
743 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
744 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000745
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000746 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000747
Victor Stinner45df8202010-04-28 22:31:17 +0000748 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000750 try:
751 # Write out many bytes with exactly the same number of 0's,
752 # 1's... 255's. This will help us check that concurrent reading
753 # doesn't duplicate or forget contents.
754 N = 1000
755 l = list(range(256)) * N
756 random.shuffle(l)
757 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000759 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000760 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000761 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000762 errors = []
763 results = []
764 def f():
765 try:
766 # Intra-buffer read then buffer-flushing read
767 for n in cycle([1, 19]):
768 s = bufio.read(n)
769 if not s:
770 break
771 # list.append() is atomic
772 results.append(s)
773 except Exception as e:
774 errors.append(e)
775 raise
776 threads = [threading.Thread(target=f) for x in range(20)]
777 for t in threads:
778 t.start()
779 time.sleep(0.02) # yield
780 for t in threads:
781 t.join()
782 self.assertFalse(errors,
783 "the following exceptions were caught: %r" % errors)
784 s = b''.join(results)
785 for i in range(256):
786 c = bytes(bytearray([i]))
787 self.assertEqual(s.count(c), N)
788 finally:
789 support.unlink(support.TESTFN)
790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 def test_misbehaved_io(self):
792 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
793 bufio = self.tp(rawio)
794 self.assertRaises(IOError, bufio.seek, 0)
795 self.assertRaises(IOError, bufio.tell)
796
797class CBufferedReaderTest(BufferedReaderTest):
798 tp = io.BufferedReader
799
800 def test_constructor(self):
801 BufferedReaderTest.test_constructor(self)
802 # The allocation can succeed on 32-bit builds, e.g. with more
803 # than 2GB RAM and a 64-bit kernel.
804 if sys.maxsize > 0x7FFFFFFF:
805 rawio = self.MockRawIO()
806 bufio = self.tp(rawio)
807 self.assertRaises((OverflowError, MemoryError, ValueError),
808 bufio.__init__, rawio, sys.maxsize)
809
810 def test_initialization(self):
811 rawio = self.MockRawIO([b"abc"])
812 bufio = self.tp(rawio)
813 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
814 self.assertRaises(ValueError, bufio.read)
815 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
816 self.assertRaises(ValueError, bufio.read)
817 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
818 self.assertRaises(ValueError, bufio.read)
819
820 def test_misbehaved_io_read(self):
821 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
822 bufio = self.tp(rawio)
823 # _pyio.BufferedReader seems to implement reading different, so that
824 # checking this is not so easy.
825 self.assertRaises(IOError, bufio.read, 10)
826
827 def test_garbage_collection(self):
828 # C BufferedReader objects are collected.
829 # The Python version has __del__, so it ends into gc.garbage instead
830 rawio = self.FileIO(support.TESTFN, "w+b")
831 f = self.tp(rawio)
832 f.f = f
833 wr = weakref.ref(f)
834 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000835 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000836 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837
838class PyBufferedReaderTest(BufferedReaderTest):
839 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000840
Guido van Rossuma9e20242007-03-08 00:43:48 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
843 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_constructor(self):
846 rawio = self.MockRawIO()
847 bufio = self.tp(rawio)
848 bufio.__init__(rawio)
849 bufio.__init__(rawio, buffer_size=1024)
850 bufio.__init__(rawio, buffer_size=16)
851 self.assertEquals(3, bufio.write(b"abc"))
852 bufio.flush()
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
855 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
856 bufio.__init__(rawio)
857 self.assertEquals(3, bufio.write(b"ghi"))
858 bufio.flush()
859 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
860
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000861 def test_detach_flush(self):
862 raw = self.MockRawIO()
863 buf = self.tp(raw)
864 buf.write(b"howdy!")
865 self.assertFalse(raw._write_stack)
866 buf.detach()
867 self.assertEqual(raw._write_stack, [b"howdy!"])
868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000870 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000871 writer = self.MockRawIO()
872 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000873 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000874 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_write_overflow(self):
877 writer = self.MockRawIO()
878 bufio = self.tp(writer, 8)
879 contents = b"abcdefghijklmnop"
880 for n in range(0, len(contents), 3):
881 bufio.write(contents[n:n+3])
882 flushed = b"".join(writer._write_stack)
883 # At least (total - 8) bytes were implicitly flushed, perhaps more
884 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000885 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000886
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000887 def check_writes(self, intermediate_func):
888 # Lots of writes, test the flushed output is as expected.
889 contents = bytes(range(256)) * 1000
890 n = 0
891 writer = self.MockRawIO()
892 bufio = self.tp(writer, 13)
893 # Generator of write sizes: repeat each N 15 times then proceed to N+1
894 def gen_sizes():
895 for size in count(1):
896 for i in range(15):
897 yield size
898 sizes = gen_sizes()
899 while n < len(contents):
900 size = min(next(sizes), len(contents) - n)
901 self.assertEquals(bufio.write(contents[n:n+size]), size)
902 intermediate_func(bufio)
903 n += size
904 bufio.flush()
905 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 def test_writes(self):
908 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_writes_and_flushes(self):
911 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913 def test_writes_and_seeks(self):
914 def _seekabs(bufio):
915 pos = bufio.tell()
916 bufio.seek(pos + 1, 0)
917 bufio.seek(pos - 1, 0)
918 bufio.seek(pos, 0)
919 self.check_writes(_seekabs)
920 def _seekrel(bufio):
921 pos = bufio.seek(0, 1)
922 bufio.seek(+1, 1)
923 bufio.seek(-1, 1)
924 bufio.seek(pos, 0)
925 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 def test_writes_and_truncates(self):
928 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 def test_write_non_blocking(self):
931 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000932 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000934 self.assertEquals(bufio.write(b"abcd"), 4)
935 self.assertEquals(bufio.write(b"efghi"), 5)
936 # 1 byte will be written, the rest will be buffered
937 raw.block_on(b"k")
938 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000940 # 8 bytes will be written, 8 will be buffered and the rest will be lost
941 raw.block_on(b"0")
942 try:
943 bufio.write(b"opqrwxyz0123456789")
944 except self.BlockingIOError as e:
945 written = e.characters_written
946 else:
947 self.fail("BlockingIOError should have been raised")
948 self.assertEquals(written, 16)
949 self.assertEquals(raw.pop_written(),
950 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
953 s = raw.pop_written()
954 # Previously buffered bytes were flushed
955 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_write_and_rewind(self):
958 raw = io.BytesIO()
959 bufio = self.tp(raw, 4)
960 self.assertEqual(bufio.write(b"abcdef"), 6)
961 self.assertEqual(bufio.tell(), 6)
962 bufio.seek(0, 0)
963 self.assertEqual(bufio.write(b"XY"), 2)
964 bufio.seek(6, 0)
965 self.assertEqual(raw.getvalue(), b"XYcdef")
966 self.assertEqual(bufio.write(b"123456"), 6)
967 bufio.flush()
968 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970 def test_flush(self):
971 writer = self.MockRawIO()
972 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000973 bufio.write(b"abc")
974 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000975 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977 def test_destructor(self):
978 writer = self.MockRawIO()
979 bufio = self.tp(writer, 8)
980 bufio.write(b"abc")
981 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000982 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 self.assertEquals(b"abc", writer._write_stack[0])
984
985 def test_truncate(self):
986 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000987 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 bufio = self.tp(raw, 8)
989 bufio.write(b"abcdef")
990 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000991 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000992 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 self.assertEqual(f.read(), b"abc")
994
Victor Stinner45df8202010-04-28 22:31:17 +0000995 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000997 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998 # Write out many bytes from many threads and test they were
999 # all flushed.
1000 N = 1000
1001 contents = bytes(range(256)) * N
1002 sizes = cycle([1, 19])
1003 n = 0
1004 queue = deque()
1005 while n < len(contents):
1006 size = next(sizes)
1007 queue.append(contents[n:n+size])
1008 n += size
1009 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001010 # We use a real file object because it allows us to
1011 # exercise situations where the GIL is released before
1012 # writing the buffer to the raw streams. This is in addition
1013 # to concurrency issues due to switching threads in the middle
1014 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001015 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001017 errors = []
1018 def f():
1019 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 while True:
1021 try:
1022 s = queue.popleft()
1023 except IndexError:
1024 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001025 bufio.write(s)
1026 except Exception as e:
1027 errors.append(e)
1028 raise
1029 threads = [threading.Thread(target=f) for x in range(20)]
1030 for t in threads:
1031 t.start()
1032 time.sleep(0.02) # yield
1033 for t in threads:
1034 t.join()
1035 self.assertFalse(errors,
1036 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001038 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 s = f.read()
1040 for i in range(256):
1041 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001042 finally:
1043 support.unlink(support.TESTFN)
1044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_misbehaved_io(self):
1046 rawio = self.MisbehavedRawIO()
1047 bufio = self.tp(rawio, 5)
1048 self.assertRaises(IOError, bufio.seek, 0)
1049 self.assertRaises(IOError, bufio.tell)
1050 self.assertRaises(IOError, bufio.write, b"abcdef")
1051
Benjamin Peterson59406a92009-03-26 17:10:29 +00001052 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001053 with support.check_warnings(("max_buffer_size is deprecated",
1054 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001055 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001056
1057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058class CBufferedWriterTest(BufferedWriterTest):
1059 tp = io.BufferedWriter
1060
1061 def test_constructor(self):
1062 BufferedWriterTest.test_constructor(self)
1063 # The allocation can succeed on 32-bit builds, e.g. with more
1064 # than 2GB RAM and a 64-bit kernel.
1065 if sys.maxsize > 0x7FFFFFFF:
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 self.assertRaises((OverflowError, MemoryError, ValueError),
1069 bufio.__init__, rawio, sys.maxsize)
1070
1071 def test_initialization(self):
1072 rawio = self.MockRawIO()
1073 bufio = self.tp(rawio)
1074 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1075 self.assertRaises(ValueError, bufio.write, b"def")
1076 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1077 self.assertRaises(ValueError, bufio.write, b"def")
1078 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1079 self.assertRaises(ValueError, bufio.write, b"def")
1080
1081 def test_garbage_collection(self):
1082 # C BufferedWriter objects are collected, and collecting them flushes
1083 # all data to disk.
1084 # The Python version has __del__, so it ends into gc.garbage instead
1085 rawio = self.FileIO(support.TESTFN, "w+b")
1086 f = self.tp(rawio)
1087 f.write(b"123xxx")
1088 f.x = f
1089 wr = weakref.ref(f)
1090 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001091 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001092 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001093 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 self.assertEqual(f.read(), b"123xxx")
1095
1096
1097class PyBufferedWriterTest(BufferedWriterTest):
1098 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001099
Guido van Rossum01a27522007-03-07 01:00:12 +00001100class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001101
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001102 def test_constructor(self):
1103 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001104 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001105
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001106 def test_detach(self):
1107 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1108 self.assertRaises(self.UnsupportedOperation, pair.detach)
1109
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001110 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001111 with support.check_warnings(("max_buffer_size is deprecated",
1112 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001113 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001114
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001115 def test_constructor_with_not_readable(self):
1116 class NotReadable(MockRawIO):
1117 def readable(self):
1118 return False
1119
1120 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1121
1122 def test_constructor_with_not_writeable(self):
1123 class NotWriteable(MockRawIO):
1124 def writable(self):
1125 return False
1126
1127 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1128
1129 def test_read(self):
1130 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1131
1132 self.assertEqual(pair.read(3), b"abc")
1133 self.assertEqual(pair.read(1), b"d")
1134 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001135 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1136 self.assertEqual(pair.read(None), b"abc")
1137
1138 def test_readlines(self):
1139 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1140 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1141 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1142 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001143
1144 def test_read1(self):
1145 # .read1() is delegated to the underlying reader object, so this test
1146 # can be shallow.
1147 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1148
1149 self.assertEqual(pair.read1(3), b"abc")
1150
1151 def test_readinto(self):
1152 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1153
1154 data = bytearray(5)
1155 self.assertEqual(pair.readinto(data), 5)
1156 self.assertEqual(data, b"abcde")
1157
1158 def test_write(self):
1159 w = self.MockRawIO()
1160 pair = self.tp(self.MockRawIO(), w)
1161
1162 pair.write(b"abc")
1163 pair.flush()
1164 pair.write(b"def")
1165 pair.flush()
1166 self.assertEqual(w._write_stack, [b"abc", b"def"])
1167
1168 def test_peek(self):
1169 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1170
1171 self.assertTrue(pair.peek(3).startswith(b"abc"))
1172 self.assertEqual(pair.read(3), b"abc")
1173
1174 def test_readable(self):
1175 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1176 self.assertTrue(pair.readable())
1177
1178 def test_writeable(self):
1179 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1180 self.assertTrue(pair.writable())
1181
1182 def test_seekable(self):
1183 # BufferedRWPairs are never seekable, even if their readers and writers
1184 # are.
1185 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1186 self.assertFalse(pair.seekable())
1187
1188 # .flush() is delegated to the underlying writer object and has been
1189 # tested in the test_write method.
1190
1191 def test_close_and_closed(self):
1192 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1193 self.assertFalse(pair.closed)
1194 pair.close()
1195 self.assertTrue(pair.closed)
1196
1197 def test_isatty(self):
1198 class SelectableIsAtty(MockRawIO):
1199 def __init__(self, isatty):
1200 MockRawIO.__init__(self)
1201 self._isatty = isatty
1202
1203 def isatty(self):
1204 return self._isatty
1205
1206 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1207 self.assertFalse(pair.isatty())
1208
1209 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1210 self.assertTrue(pair.isatty())
1211
1212 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1213 self.assertTrue(pair.isatty())
1214
1215 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1216 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218class CBufferedRWPairTest(BufferedRWPairTest):
1219 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221class PyBufferedRWPairTest(BufferedRWPairTest):
1222 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224
1225class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1226 read_mode = "rb+"
1227 write_mode = "wb+"
1228
1229 def test_constructor(self):
1230 BufferedReaderTest.test_constructor(self)
1231 BufferedWriterTest.test_constructor(self)
1232
1233 def test_read_and_write(self):
1234 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001235 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001236
1237 self.assertEqual(b"as", rw.read(2))
1238 rw.write(b"ddd")
1239 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001240 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001242 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 def test_seek_and_tell(self):
1245 raw = self.BytesIO(b"asdfghjkl")
1246 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001247
1248 self.assertEquals(b"as", rw.read(2))
1249 self.assertEquals(2, rw.tell())
1250 rw.seek(0, 0)
1251 self.assertEquals(b"asdf", rw.read(4))
1252
1253 rw.write(b"asdf")
1254 rw.seek(0, 0)
1255 self.assertEquals(b"asdfasdfl", rw.read())
1256 self.assertEquals(9, rw.tell())
1257 rw.seek(-4, 2)
1258 self.assertEquals(5, rw.tell())
1259 rw.seek(2, 1)
1260 self.assertEquals(7, rw.tell())
1261 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001262 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001263
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 def check_flush_and_read(self, read_func):
1265 raw = self.BytesIO(b"abcdefghi")
1266 bufio = self.tp(raw)
1267
1268 self.assertEquals(b"ab", read_func(bufio, 2))
1269 bufio.write(b"12")
1270 self.assertEquals(b"ef", read_func(bufio, 2))
1271 self.assertEquals(6, bufio.tell())
1272 bufio.flush()
1273 self.assertEquals(6, bufio.tell())
1274 self.assertEquals(b"ghi", read_func(bufio))
1275 raw.seek(0, 0)
1276 raw.write(b"XYZ")
1277 # flush() resets the read buffer
1278 bufio.flush()
1279 bufio.seek(0, 0)
1280 self.assertEquals(b"XYZ", read_func(bufio, 3))
1281
1282 def test_flush_and_read(self):
1283 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1284
1285 def test_flush_and_readinto(self):
1286 def _readinto(bufio, n=-1):
1287 b = bytearray(n if n >= 0 else 9999)
1288 n = bufio.readinto(b)
1289 return bytes(b[:n])
1290 self.check_flush_and_read(_readinto)
1291
1292 def test_flush_and_peek(self):
1293 def _peek(bufio, n=-1):
1294 # This relies on the fact that the buffer can contain the whole
1295 # raw stream, otherwise peek() can return less.
1296 b = bufio.peek(n)
1297 if n != -1:
1298 b = b[:n]
1299 bufio.seek(len(b), 1)
1300 return b
1301 self.check_flush_and_read(_peek)
1302
1303 def test_flush_and_write(self):
1304 raw = self.BytesIO(b"abcdefghi")
1305 bufio = self.tp(raw)
1306
1307 bufio.write(b"123")
1308 bufio.flush()
1309 bufio.write(b"45")
1310 bufio.flush()
1311 bufio.seek(0, 0)
1312 self.assertEquals(b"12345fghi", raw.getvalue())
1313 self.assertEquals(b"12345fghi", bufio.read())
1314
1315 def test_threads(self):
1316 BufferedReaderTest.test_threads(self)
1317 BufferedWriterTest.test_threads(self)
1318
1319 def test_writes_and_peek(self):
1320 def _peek(bufio):
1321 bufio.peek(1)
1322 self.check_writes(_peek)
1323 def _peek(bufio):
1324 pos = bufio.tell()
1325 bufio.seek(-1, 1)
1326 bufio.peek(1)
1327 bufio.seek(pos, 0)
1328 self.check_writes(_peek)
1329
1330 def test_writes_and_reads(self):
1331 def _read(bufio):
1332 bufio.seek(-1, 1)
1333 bufio.read(1)
1334 self.check_writes(_read)
1335
1336 def test_writes_and_read1s(self):
1337 def _read1(bufio):
1338 bufio.seek(-1, 1)
1339 bufio.read1(1)
1340 self.check_writes(_read1)
1341
1342 def test_writes_and_readintos(self):
1343 def _read(bufio):
1344 bufio.seek(-1, 1)
1345 bufio.readinto(bytearray(1))
1346 self.check_writes(_read)
1347
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001348 def test_write_after_readahead(self):
1349 # Issue #6629: writing after the buffer was filled by readahead should
1350 # first rewind the raw stream.
1351 for overwrite_size in [1, 5]:
1352 raw = self.BytesIO(b"A" * 10)
1353 bufio = self.tp(raw, 4)
1354 # Trigger readahead
1355 self.assertEqual(bufio.read(1), b"A")
1356 self.assertEqual(bufio.tell(), 1)
1357 # Overwriting should rewind the raw stream if it needs so
1358 bufio.write(b"B" * overwrite_size)
1359 self.assertEqual(bufio.tell(), overwrite_size + 1)
1360 # If the write size was smaller than the buffer size, flush() and
1361 # check that rewind happens.
1362 bufio.flush()
1363 self.assertEqual(bufio.tell(), overwrite_size + 1)
1364 s = raw.getvalue()
1365 self.assertEqual(s,
1366 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1367
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001368 def test_truncate_after_read_or_write(self):
1369 raw = self.BytesIO(b"A" * 10)
1370 bufio = self.tp(raw, 100)
1371 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1372 self.assertEqual(bufio.truncate(), 2)
1373 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1374 self.assertEqual(bufio.truncate(), 4)
1375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 def test_misbehaved_io(self):
1377 BufferedReaderTest.test_misbehaved_io(self)
1378 BufferedWriterTest.test_misbehaved_io(self)
1379
1380class CBufferedRandomTest(BufferedRandomTest):
1381 tp = io.BufferedRandom
1382
1383 def test_constructor(self):
1384 BufferedRandomTest.test_constructor(self)
1385 # The allocation can succeed on 32-bit builds, e.g. with more
1386 # than 2GB RAM and a 64-bit kernel.
1387 if sys.maxsize > 0x7FFFFFFF:
1388 rawio = self.MockRawIO()
1389 bufio = self.tp(rawio)
1390 self.assertRaises((OverflowError, MemoryError, ValueError),
1391 bufio.__init__, rawio, sys.maxsize)
1392
1393 def test_garbage_collection(self):
1394 CBufferedReaderTest.test_garbage_collection(self)
1395 CBufferedWriterTest.test_garbage_collection(self)
1396
1397class PyBufferedRandomTest(BufferedRandomTest):
1398 tp = pyio.BufferedRandom
1399
1400
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001401# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1402# properties:
1403# - A single output character can correspond to many bytes of input.
1404# - The number of input bytes to complete the character can be
1405# undetermined until the last input byte is received.
1406# - The number of input bytes can vary depending on previous input.
1407# - A single input byte can correspond to many characters of output.
1408# - The number of output characters can be undetermined until the
1409# last input byte is received.
1410# - The number of output characters can vary depending on previous input.
1411
1412class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1413 """
1414 For testing seek/tell behavior with a stateful, buffering decoder.
1415
1416 Input is a sequence of words. Words may be fixed-length (length set
1417 by input) or variable-length (period-terminated). In variable-length
1418 mode, extra periods are ignored. Possible words are:
1419 - 'i' followed by a number sets the input length, I (maximum 99).
1420 When I is set to 0, words are space-terminated.
1421 - 'o' followed by a number sets the output length, O (maximum 99).
1422 - Any other word is converted into a word followed by a period on
1423 the output. The output word consists of the input word truncated
1424 or padded out with hyphens to make its length equal to O. If O
1425 is 0, the word is output verbatim without truncating or padding.
1426 I and O are initially set to 1. When I changes, any buffered input is
1427 re-scanned according to the new I. EOF also terminates the last word.
1428 """
1429
1430 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001431 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001432 self.reset()
1433
1434 def __repr__(self):
1435 return '<SID %x>' % id(self)
1436
1437 def reset(self):
1438 self.i = 1
1439 self.o = 1
1440 self.buffer = bytearray()
1441
1442 def getstate(self):
1443 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1444 return bytes(self.buffer), i*100 + o
1445
1446 def setstate(self, state):
1447 buffer, io = state
1448 self.buffer = bytearray(buffer)
1449 i, o = divmod(io, 100)
1450 self.i, self.o = i ^ 1, o ^ 1
1451
1452 def decode(self, input, final=False):
1453 output = ''
1454 for b in input:
1455 if self.i == 0: # variable-length, terminated with period
1456 if b == ord('.'):
1457 if self.buffer:
1458 output += self.process_word()
1459 else:
1460 self.buffer.append(b)
1461 else: # fixed-length, terminate after self.i bytes
1462 self.buffer.append(b)
1463 if len(self.buffer) == self.i:
1464 output += self.process_word()
1465 if final and self.buffer: # EOF terminates the last word
1466 output += self.process_word()
1467 return output
1468
1469 def process_word(self):
1470 output = ''
1471 if self.buffer[0] == ord('i'):
1472 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1473 elif self.buffer[0] == ord('o'):
1474 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1475 else:
1476 output = self.buffer.decode('ascii')
1477 if len(output) < self.o:
1478 output += '-'*self.o # pad out with hyphens
1479 if self.o:
1480 output = output[:self.o] # truncate to output length
1481 output += '.'
1482 self.buffer = bytearray()
1483 return output
1484
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001485 codecEnabled = False
1486
1487 @classmethod
1488 def lookupTestDecoder(cls, name):
1489 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001490 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001491 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001492 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001493 incrementalencoder=None,
1494 streamreader=None, streamwriter=None,
1495 incrementaldecoder=cls)
1496
1497# Register the previous decoder for testing.
1498# Disabled by default, tests will enable it.
1499codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1500
1501
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001502class StatefulIncrementalDecoderTest(unittest.TestCase):
1503 """
1504 Make sure the StatefulIncrementalDecoder actually works.
1505 """
1506
1507 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001508 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001509 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001510 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001511 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001512 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001513 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001514 # I=0, O=6 (variable-length input, fixed-length output)
1515 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1516 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001517 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001518 # I=6, O=3 (fixed-length input > fixed-length output)
1519 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1520 # I=0, then 3; O=29, then 15 (with longer output)
1521 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1522 'a----------------------------.' +
1523 'b----------------------------.' +
1524 'cde--------------------------.' +
1525 'abcdefghijabcde.' +
1526 'a.b------------.' +
1527 '.c.------------.' +
1528 'd.e------------.' +
1529 'k--------------.' +
1530 'l--------------.' +
1531 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001532 ]
1533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001535 # Try a few one-shot test cases.
1536 for input, eof, output in self.test_cases:
1537 d = StatefulIncrementalDecoder()
1538 self.assertEquals(d.decode(input, eof), output)
1539
1540 # Also test an unfinished decode, followed by forcing EOF.
1541 d = StatefulIncrementalDecoder()
1542 self.assertEquals(d.decode(b'oiabcd'), '')
1543 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001544
1545class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001546
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001547 def setUp(self):
1548 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1549 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001550 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001551
Guido van Rossumd0712812007-04-11 16:32:43 +00001552 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001553 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 def test_constructor(self):
1556 r = self.BytesIO(b"\xc3\xa9\n\n")
1557 b = self.BufferedReader(r, 1000)
1558 t = self.TextIOWrapper(b)
1559 t.__init__(b, encoding="latin1", newline="\r\n")
1560 self.assertEquals(t.encoding, "latin1")
1561 self.assertEquals(t.line_buffering, False)
1562 t.__init__(b, encoding="utf8", line_buffering=True)
1563 self.assertEquals(t.encoding, "utf8")
1564 self.assertEquals(t.line_buffering, True)
1565 self.assertEquals("\xe9\n", t.readline())
1566 self.assertRaises(TypeError, t.__init__, b, newline=42)
1567 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1568
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001569 def test_detach(self):
1570 r = self.BytesIO()
1571 b = self.BufferedWriter(r)
1572 t = self.TextIOWrapper(b)
1573 self.assertIs(t.detach(), b)
1574
1575 t = self.TextIOWrapper(b, encoding="ascii")
1576 t.write("howdy")
1577 self.assertFalse(r.getvalue())
1578 t.detach()
1579 self.assertEqual(r.getvalue(), b"howdy")
1580 self.assertRaises(ValueError, t.detach)
1581
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001582 def test_repr(self):
1583 raw = self.BytesIO("hello".encode("utf-8"))
1584 b = self.BufferedReader(raw)
1585 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001586 modname = self.TextIOWrapper.__module__
1587 self.assertEqual(repr(t),
1588 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1589 raw.name = "dummy"
1590 self.assertEqual(repr(t),
1591 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1592 raw.name = b"dummy"
1593 self.assertEqual(repr(t),
1594 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 def test_line_buffering(self):
1597 r = self.BytesIO()
1598 b = self.BufferedWriter(r, 1000)
1599 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001600 t.write("X")
1601 self.assertEquals(r.getvalue(), b"") # No flush happened
1602 t.write("Y\nZ")
1603 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1604 t.write("A\rB")
1605 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001607 def test_encoding(self):
1608 # Check the encoding attribute is always set, and valid
1609 b = self.BytesIO()
1610 t = self.TextIOWrapper(b, encoding="utf8")
1611 self.assertEqual(t.encoding, "utf8")
1612 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001613 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614 codecs.lookup(t.encoding)
1615
1616 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001617 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 b = self.BytesIO(b"abc\n\xff\n")
1619 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001620 self.assertRaises(UnicodeError, t.read)
1621 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 b = self.BytesIO(b"abc\n\xff\n")
1623 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001624 self.assertRaises(UnicodeError, t.read)
1625 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 b = self.BytesIO(b"abc\n\xff\n")
1627 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001628 self.assertEquals(t.read(), "abc\n\n")
1629 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 b = self.BytesIO(b"abc\n\xff\n")
1631 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001632 self.assertEquals(t.read(), "abc\n\ufffd\n")
1633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001635 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636 b = self.BytesIO()
1637 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001638 self.assertRaises(UnicodeError, t.write, "\xff")
1639 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 b = self.BytesIO()
1641 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001642 self.assertRaises(UnicodeError, t.write, "\xff")
1643 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 b = self.BytesIO()
1645 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001646 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001647 t.write("abc\xffdef\n")
1648 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001649 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001650 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 b = self.BytesIO()
1652 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001653 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001654 t.write("abc\xffdef\n")
1655 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001656 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001659 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1660
1661 tests = [
1662 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001663 [ '', input_lines ],
1664 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1665 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1666 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001667 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001668 encodings = (
1669 'utf-8', 'latin-1',
1670 'utf-16', 'utf-16-le', 'utf-16-be',
1671 'utf-32', 'utf-32-le', 'utf-32-be',
1672 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001673
Guido van Rossum8358db22007-08-18 21:39:55 +00001674 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001675 # character in TextIOWrapper._pending_line.
1676 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001677 # XXX: str.encode() should return bytes
1678 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001679 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001680 for bufsize in range(1, 10):
1681 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1683 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001684 encoding=encoding)
1685 if do_reads:
1686 got_lines = []
1687 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001688 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001689 if c2 == '':
1690 break
1691 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001692 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001693 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001694 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001695
1696 for got_line, exp_line in zip(got_lines, exp_lines):
1697 self.assertEquals(got_line, exp_line)
1698 self.assertEquals(len(got_lines), len(exp_lines))
1699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 def test_newlines_input(self):
1701 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001702 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1703 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001704 (None, normalized.decode("ascii").splitlines(True)),
1705 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1707 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1708 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001709 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 buf = self.BytesIO(testdata)
1711 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001712 self.assertEquals(txt.readlines(), expected)
1713 txt.seek(0)
1714 self.assertEquals(txt.read(), "".join(expected))
1715
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001716 def test_newlines_output(self):
1717 testdict = {
1718 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1719 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1720 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1721 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1722 }
1723 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1724 for newline, expected in tests:
1725 buf = self.BytesIO()
1726 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1727 txt.write("AAA\nB")
1728 txt.write("BB\nCCC\n")
1729 txt.write("X\rY\r\nZ")
1730 txt.flush()
1731 self.assertEquals(buf.closed, False)
1732 self.assertEquals(buf.getvalue(), expected)
1733
1734 def test_destructor(self):
1735 l = []
1736 base = self.BytesIO
1737 class MyBytesIO(base):
1738 def close(self):
1739 l.append(self.getvalue())
1740 base.close(self)
1741 b = MyBytesIO()
1742 t = self.TextIOWrapper(b, encoding="ascii")
1743 t.write("abc")
1744 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001745 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 self.assertEquals([b"abc"], l)
1747
1748 def test_override_destructor(self):
1749 record = []
1750 class MyTextIO(self.TextIOWrapper):
1751 def __del__(self):
1752 record.append(1)
1753 try:
1754 f = super().__del__
1755 except AttributeError:
1756 pass
1757 else:
1758 f()
1759 def close(self):
1760 record.append(2)
1761 super().close()
1762 def flush(self):
1763 record.append(3)
1764 super().flush()
1765 b = self.BytesIO()
1766 t = MyTextIO(b, encoding="ascii")
1767 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001768 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 self.assertEqual(record, [1, 2, 3])
1770
1771 def test_error_through_destructor(self):
1772 # Test that the exception state is not modified by a destructor,
1773 # even if close() fails.
1774 rawio = self.CloseFailureIO()
1775 def f():
1776 self.TextIOWrapper(rawio).xyzzy
1777 with support.captured_output("stderr") as s:
1778 self.assertRaises(AttributeError, f)
1779 s = s.getvalue().strip()
1780 if s:
1781 # The destructor *may* have printed an unraisable error, check it
1782 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001783 self.assertTrue(s.startswith("Exception IOError: "), s)
1784 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001785
Guido van Rossum9b76da62007-04-11 01:09:03 +00001786 # Systematic tests of the text I/O API
1787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001789 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1790 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001792 f._CHUNK_SIZE = chunksize
1793 self.assertEquals(f.write("abc"), 3)
1794 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001796 f._CHUNK_SIZE = chunksize
1797 self.assertEquals(f.tell(), 0)
1798 self.assertEquals(f.read(), "abc")
1799 cookie = f.tell()
1800 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001801 self.assertEquals(f.read(None), "abc")
1802 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001803 self.assertEquals(f.read(2), "ab")
1804 self.assertEquals(f.read(1), "c")
1805 self.assertEquals(f.read(1), "")
1806 self.assertEquals(f.read(), "")
1807 self.assertEquals(f.tell(), cookie)
1808 self.assertEquals(f.seek(0), 0)
1809 self.assertEquals(f.seek(0, 2), cookie)
1810 self.assertEquals(f.write("def"), 3)
1811 self.assertEquals(f.seek(cookie), cookie)
1812 self.assertEquals(f.read(), "def")
1813 if enc.startswith("utf"):
1814 self.multi_line_test(f, enc)
1815 f.close()
1816
1817 def multi_line_test(self, f, enc):
1818 f.seek(0)
1819 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001820 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001821 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001822 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001823 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001824 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001825 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001826 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001827 wlines.append((f.tell(), line))
1828 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001829 f.seek(0)
1830 rlines = []
1831 while True:
1832 pos = f.tell()
1833 line = f.readline()
1834 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001835 break
1836 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001837 self.assertEquals(rlines, wlines)
1838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 def test_telling(self):
1840 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001841 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001842 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001843 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001844 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001845 p2 = f.tell()
1846 f.seek(0)
1847 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001848 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001849 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001850 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001851 self.assertEquals(f.tell(), p2)
1852 f.seek(0)
1853 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001854 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001855 self.assertRaises(IOError, f.tell)
1856 self.assertEquals(f.tell(), p2)
1857 f.close()
1858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001859 def test_seeking(self):
1860 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001861 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001862 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001863 prefix = bytes(u_prefix.encode("utf-8"))
1864 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001865 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001866 suffix = bytes(u_suffix.encode("utf-8"))
1867 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001869 f.write(line*2)
1870 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001872 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001873 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001874 self.assertEquals(f.tell(), prefix_size)
1875 self.assertEquals(f.readline(), u_suffix)
1876
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001878 # Regression test for a specific bug
1879 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001881 f.write(data)
1882 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001884 f._CHUNK_SIZE # Just test that it exists
1885 f._CHUNK_SIZE = 2
1886 f.readline()
1887 f.tell()
1888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 def test_seek_and_tell(self):
1890 #Test seek/tell using the StatefulIncrementalDecoder.
1891 # Make test faster by doing smaller seeks
1892 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001893
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001894 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001895 """Tell/seek to various points within a data stream and ensure
1896 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 f.write(data)
1899 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 f = self.open(support.TESTFN, encoding='test_decoder')
1901 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902 decoded = f.read()
1903 f.close()
1904
Neal Norwitze2b07052008-03-18 19:52:05 +00001905 for i in range(min_pos, len(decoded) + 1): # seek positions
1906 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001908 self.assertEquals(f.read(i), decoded[:i])
1909 cookie = f.tell()
1910 self.assertEquals(f.read(j), decoded[i:i + j])
1911 f.seek(cookie)
1912 self.assertEquals(f.read(), decoded[i:])
1913 f.close()
1914
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001915 # Enable the test decoder.
1916 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001917
1918 # Run the tests.
1919 try:
1920 # Try each test case.
1921 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001922 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001923
1924 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001925 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1926 offset = CHUNK_SIZE - len(input)//2
1927 prefix = b'.'*offset
1928 # Don't bother seeking into the prefix (takes too long).
1929 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001930 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001931
1932 # Ensure our test decoder won't interfere with subsequent tests.
1933 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001934 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001937 data = "1234567890"
1938 tests = ("utf-16",
1939 "utf-16-le",
1940 "utf-16-be",
1941 "utf-32",
1942 "utf-32-le",
1943 "utf-32-be")
1944 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 buf = self.BytesIO()
1946 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001947 # Check if the BOM is written only once (see issue1753).
1948 f.write(data)
1949 f.write(data)
1950 f.seek(0)
1951 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001952 f.seek(0)
1953 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001954 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1955
Benjamin Petersona1b49012009-03-31 23:11:32 +00001956 def test_unreadable(self):
1957 class UnReadable(self.BytesIO):
1958 def readable(self):
1959 return False
1960 txt = self.TextIOWrapper(UnReadable())
1961 self.assertRaises(IOError, txt.read)
1962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 def test_read_one_by_one(self):
1964 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001965 reads = ""
1966 while True:
1967 c = txt.read(1)
1968 if not c:
1969 break
1970 reads += c
1971 self.assertEquals(reads, "AA\nBB")
1972
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001973 def test_readlines(self):
1974 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1975 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1976 txt.seek(0)
1977 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1978 txt.seek(0)
1979 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1980
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001981 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001983 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001985 reads = ""
1986 while True:
1987 c = txt.read(128)
1988 if not c:
1989 break
1990 reads += c
1991 self.assertEquals(reads, "A"*127+"\nB")
1992
1993 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001995
1996 # read one char at a time
1997 reads = ""
1998 while True:
1999 c = txt.read(1)
2000 if not c:
2001 break
2002 reads += c
2003 self.assertEquals(reads, self.normalized)
2004
2005 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002007 txt._CHUNK_SIZE = 4
2008
2009 reads = ""
2010 while True:
2011 c = txt.read(4)
2012 if not c:
2013 break
2014 reads += c
2015 self.assertEquals(reads, self.normalized)
2016
2017 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002019 txt._CHUNK_SIZE = 4
2020
2021 reads = txt.read(4)
2022 reads += txt.read(4)
2023 reads += txt.readline()
2024 reads += txt.readline()
2025 reads += txt.readline()
2026 self.assertEquals(reads, self.normalized)
2027
2028 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002030 txt._CHUNK_SIZE = 4
2031
2032 reads = txt.read(4)
2033 reads += txt.read()
2034 self.assertEquals(reads, self.normalized)
2035
2036 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002038 txt._CHUNK_SIZE = 4
2039
2040 reads = txt.read(4)
2041 pos = txt.tell()
2042 txt.seek(0)
2043 txt.seek(pos)
2044 self.assertEquals(txt.read(4), "BBB\n")
2045
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002046 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 buffer = self.BytesIO(self.testdata)
2048 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002049
2050 self.assertEqual(buffer.seekable(), txt.seekable())
2051
Antoine Pitroue4501852009-05-14 18:55:55 +00002052 def test_append_bom(self):
2053 # The BOM is not written again when appending to a non-empty file
2054 filename = support.TESTFN
2055 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2056 with self.open(filename, 'w', encoding=charset) as f:
2057 f.write('aaa')
2058 pos = f.tell()
2059 with self.open(filename, 'rb') as f:
2060 self.assertEquals(f.read(), 'aaa'.encode(charset))
2061
2062 with self.open(filename, 'a', encoding=charset) as f:
2063 f.write('xxx')
2064 with self.open(filename, 'rb') as f:
2065 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2066
2067 def test_seek_bom(self):
2068 # Same test, but when seeking manually
2069 filename = support.TESTFN
2070 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2071 with self.open(filename, 'w', encoding=charset) as f:
2072 f.write('aaa')
2073 pos = f.tell()
2074 with self.open(filename, 'r+', encoding=charset) as f:
2075 f.seek(pos)
2076 f.write('zzz')
2077 f.seek(0)
2078 f.write('bbb')
2079 with self.open(filename, 'rb') as f:
2080 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2081
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002082 def test_errors_property(self):
2083 with self.open(support.TESTFN, "w") as f:
2084 self.assertEqual(f.errors, "strict")
2085 with self.open(support.TESTFN, "w", errors="replace") as f:
2086 self.assertEqual(f.errors, "replace")
2087
Victor Stinner45df8202010-04-28 22:31:17 +00002088 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002089 def test_threads_write(self):
2090 # Issue6750: concurrent writes could duplicate data
2091 event = threading.Event()
2092 with self.open(support.TESTFN, "w", buffering=1) as f:
2093 def run(n):
2094 text = "Thread%03d\n" % n
2095 event.wait()
2096 f.write(text)
2097 threads = [threading.Thread(target=lambda n=x: run(n))
2098 for x in range(20)]
2099 for t in threads:
2100 t.start()
2101 time.sleep(0.02)
2102 event.set()
2103 for t in threads:
2104 t.join()
2105 with self.open(support.TESTFN) as f:
2106 content = f.read()
2107 for n in range(20):
2108 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110class CTextIOWrapperTest(TextIOWrapperTest):
2111
2112 def test_initialization(self):
2113 r = self.BytesIO(b"\xc3\xa9\n\n")
2114 b = self.BufferedReader(r, 1000)
2115 t = self.TextIOWrapper(b)
2116 self.assertRaises(TypeError, t.__init__, b, newline=42)
2117 self.assertRaises(ValueError, t.read)
2118 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2119 self.assertRaises(ValueError, t.read)
2120
2121 def test_garbage_collection(self):
2122 # C TextIOWrapper objects are collected, and collecting them flushes
2123 # all data to disk.
2124 # The Python version has __del__, so it ends in gc.garbage instead.
2125 rawio = io.FileIO(support.TESTFN, "wb")
2126 b = self.BufferedWriter(rawio)
2127 t = self.TextIOWrapper(b, encoding="ascii")
2128 t.write("456def")
2129 t.x = t
2130 wr = weakref.ref(t)
2131 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002132 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002133 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002134 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 self.assertEqual(f.read(), b"456def")
2136
2137class PyTextIOWrapperTest(TextIOWrapperTest):
2138 pass
2139
2140
2141class IncrementalNewlineDecoderTest(unittest.TestCase):
2142
2143 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002144 # UTF-8 specific tests for a newline decoder
2145 def _check_decode(b, s, **kwargs):
2146 # We exercise getstate() / setstate() as well as decode()
2147 state = decoder.getstate()
2148 self.assertEquals(decoder.decode(b, **kwargs), s)
2149 decoder.setstate(state)
2150 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002151
Antoine Pitrou180a3362008-12-14 16:36:46 +00002152 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002153
Antoine Pitrou180a3362008-12-14 16:36:46 +00002154 _check_decode(b'\xe8', "")
2155 _check_decode(b'\xa2', "")
2156 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002157
Antoine Pitrou180a3362008-12-14 16:36:46 +00002158 _check_decode(b'\xe8', "")
2159 _check_decode(b'\xa2', "")
2160 _check_decode(b'\x88', "\u8888")
2161
2162 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002163 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2164
Antoine Pitrou180a3362008-12-14 16:36:46 +00002165 decoder.reset()
2166 _check_decode(b'\n', "\n")
2167 _check_decode(b'\r', "")
2168 _check_decode(b'', "\n", final=True)
2169 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002170
Antoine Pitrou180a3362008-12-14 16:36:46 +00002171 _check_decode(b'\r', "")
2172 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002173
Antoine Pitrou180a3362008-12-14 16:36:46 +00002174 _check_decode(b'\r\r\n', "\n\n")
2175 _check_decode(b'\r', "")
2176 _check_decode(b'\r', "\n")
2177 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002178
Antoine Pitrou180a3362008-12-14 16:36:46 +00002179 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2180 _check_decode(b'\xe8\xa2\x88', "\u8888")
2181 _check_decode(b'\n', "\n")
2182 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2183 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002186 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 if encoding is not None:
2188 encoder = codecs.getincrementalencoder(encoding)()
2189 def _decode_bytewise(s):
2190 # Decode one byte at a time
2191 for b in encoder.encode(s):
2192 result.append(decoder.decode(bytes([b])))
2193 else:
2194 encoder = None
2195 def _decode_bytewise(s):
2196 # Decode one char at a time
2197 for c in s:
2198 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002199 self.assertEquals(decoder.newlines, None)
2200 _decode_bytewise("abc\n\r")
2201 self.assertEquals(decoder.newlines, '\n')
2202 _decode_bytewise("\nabc")
2203 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2204 _decode_bytewise("abc\r")
2205 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2206 _decode_bytewise("abc")
2207 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2208 _decode_bytewise("abc\r")
2209 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2210 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 input = "abc"
2212 if encoder is not None:
2213 encoder.reset()
2214 input = encoder.encode(input)
2215 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002216 self.assertEquals(decoder.newlines, None)
2217
2218 def test_newline_decoder(self):
2219 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 # None meaning the IncrementalNewlineDecoder takes unicode input
2221 # rather than bytes input
2222 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002223 'utf-16', 'utf-16-le', 'utf-16-be',
2224 'utf-32', 'utf-32-le', 'utf-32-be',
2225 )
2226 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227 decoder = enc and codecs.getincrementaldecoder(enc)()
2228 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2229 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002230 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2232 self.check_newline_decoding_utf8(decoder)
2233
Antoine Pitrou66913e22009-03-06 23:40:56 +00002234 def test_newline_bytes(self):
2235 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2236 def _check(dec):
2237 self.assertEquals(dec.newlines, None)
2238 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2239 self.assertEquals(dec.newlines, None)
2240 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2241 self.assertEquals(dec.newlines, None)
2242 dec = self.IncrementalNewlineDecoder(None, translate=False)
2243 _check(dec)
2244 dec = self.IncrementalNewlineDecoder(None, translate=True)
2245 _check(dec)
2246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002247class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2248 pass
2249
2250class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2251 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002252
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002253
Guido van Rossum01a27522007-03-07 01:00:12 +00002254# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002255
Guido van Rossum5abbf752007-08-27 17:39:33 +00002256class MiscIOTest(unittest.TestCase):
2257
Barry Warsaw40e82462008-11-20 20:14:50 +00002258 def tearDown(self):
2259 support.unlink(support.TESTFN)
2260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 def test___all__(self):
2262 for name in self.io.__all__:
2263 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002264 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002265 if name == "open":
2266 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002267 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002268 self.assertTrue(issubclass(obj, Exception), name)
2269 elif not name.startswith("SEEK_"):
2270 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002271
Barry Warsaw40e82462008-11-20 20:14:50 +00002272 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002274 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002275 f.close()
2276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002278 self.assertEquals(f.name, support.TESTFN)
2279 self.assertEquals(f.buffer.name, support.TESTFN)
2280 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2281 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002282 self.assertEquals(f.buffer.mode, "rb")
2283 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002284 f.close()
2285
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002287 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002288 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2289 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002292 self.assertEquals(g.mode, "wb")
2293 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002294 self.assertEquals(g.name, f.fileno())
2295 self.assertEquals(g.raw.name, f.fileno())
2296 f.close()
2297 g.close()
2298
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002299 def test_io_after_close(self):
2300 for kwargs in [
2301 {"mode": "w"},
2302 {"mode": "wb"},
2303 {"mode": "w", "buffering": 1},
2304 {"mode": "w", "buffering": 2},
2305 {"mode": "wb", "buffering": 0},
2306 {"mode": "r"},
2307 {"mode": "rb"},
2308 {"mode": "r", "buffering": 1},
2309 {"mode": "r", "buffering": 2},
2310 {"mode": "rb", "buffering": 0},
2311 {"mode": "w+"},
2312 {"mode": "w+b"},
2313 {"mode": "w+", "buffering": 1},
2314 {"mode": "w+", "buffering": 2},
2315 {"mode": "w+b", "buffering": 0},
2316 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002317 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002318 f.close()
2319 self.assertRaises(ValueError, f.flush)
2320 self.assertRaises(ValueError, f.fileno)
2321 self.assertRaises(ValueError, f.isatty)
2322 self.assertRaises(ValueError, f.__iter__)
2323 if hasattr(f, "peek"):
2324 self.assertRaises(ValueError, f.peek, 1)
2325 self.assertRaises(ValueError, f.read)
2326 if hasattr(f, "read1"):
2327 self.assertRaises(ValueError, f.read1, 1024)
2328 if hasattr(f, "readinto"):
2329 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2330 self.assertRaises(ValueError, f.readline)
2331 self.assertRaises(ValueError, f.readlines)
2332 self.assertRaises(ValueError, f.seek, 0)
2333 self.assertRaises(ValueError, f.tell)
2334 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 self.assertRaises(ValueError, f.write,
2336 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002337 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002340 def test_blockingioerror(self):
2341 # Various BlockingIOError issues
2342 self.assertRaises(TypeError, self.BlockingIOError)
2343 self.assertRaises(TypeError, self.BlockingIOError, 1)
2344 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2345 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2346 b = self.BlockingIOError(1, "")
2347 self.assertEqual(b.characters_written, 0)
2348 class C(str):
2349 pass
2350 c = C("")
2351 b = self.BlockingIOError(1, c)
2352 c.b = b
2353 b.c = c
2354 wr = weakref.ref(c)
2355 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002356 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002357 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358
2359 def test_abcs(self):
2360 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002361 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2362 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2363 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2364 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365
2366 def _check_abc_inheritance(self, abcmodule):
2367 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002368 self.assertIsInstance(f, abcmodule.IOBase)
2369 self.assertIsInstance(f, abcmodule.RawIOBase)
2370 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2371 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002373 self.assertIsInstance(f, abcmodule.IOBase)
2374 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2375 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2376 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002378 self.assertIsInstance(f, abcmodule.IOBase)
2379 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2380 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2381 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382
2383 def test_abc_inheritance(self):
2384 # Test implementations inherit from their respective ABCs
2385 self._check_abc_inheritance(self)
2386
2387 def test_abc_inheritance_official(self):
2388 # Test implementations inherit from the official ABCs of the
2389 # baseline "io" module.
2390 self._check_abc_inheritance(io)
2391
2392class CMiscIOTest(MiscIOTest):
2393 io = io
2394
2395class PyMiscIOTest(MiscIOTest):
2396 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002397
Guido van Rossum28524c72007-02-27 05:47:44 +00002398def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002399 tests = (CIOTest, PyIOTest,
2400 CBufferedReaderTest, PyBufferedReaderTest,
2401 CBufferedWriterTest, PyBufferedWriterTest,
2402 CBufferedRWPairTest, PyBufferedRWPairTest,
2403 CBufferedRandomTest, PyBufferedRandomTest,
2404 StatefulIncrementalDecoderTest,
2405 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2406 CTextIOWrapperTest, PyTextIOWrapperTest,
2407 CMiscIOTest, PyMiscIOTest,)
2408
2409 # Put the namespaces of the IO module we are testing and some useful mock
2410 # classes in the __dict__ of each test.
2411 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2412 MockNonBlockWriterIO)
2413 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2414 c_io_ns = {name : getattr(io, name) for name in all_members}
2415 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2416 globs = globals()
2417 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2418 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2419 # Avoid turning open into a bound method.
2420 py_io_ns["open"] = pyio.OpenWrapper
2421 for test in tests:
2422 if test.__name__.startswith("C"):
2423 for name, obj in c_io_ns.items():
2424 setattr(test, name, obj)
2425 elif test.__name__.startswith("Py"):
2426 for name, obj in py_io_ns.items():
2427 setattr(test, name, obj)
2428
2429 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002430
2431if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 test_main()