blob: 1b2d4916ff49263bd5d58701d327bc137a524708 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000031import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000039
Guido van Rossuma9e20242007-03-08 00:43:48 +000040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041def _default_chunk_size():
42 """Get the default TextIOWrapper chunk size"""
43 with open(__file__, "r", encoding="latin1") as f:
44 return f._CHUNK_SIZE
45
46
47class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000049 def __init__(self, read_stack=()):
50 self._read_stack = list(read_stack)
51 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000053
54 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000056 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000059 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
89 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
106class CMockRawIO(MockRawIO, io.RawIOBase):
107 pass
108
109class PyMockRawIO(MockRawIO, pyio.RawIOBase):
110 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111
Guido van Rossuma9e20242007-03-08 00:43:48 +0000112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113class MisbehavedRawIO(MockRawIO):
114 def write(self, b):
115 return super().write(b) * 2
116
117 def read(self, n=None):
118 return super().read(n) * 2
119
120 def seek(self, pos, whence):
121 return -123
122
123 def tell(self):
124 return -456
125
126 def readinto(self, buf):
127 super().readinto(buf)
128 return len(buf) * 5
129
130class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
131 pass
132
133class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
134 pass
135
136
137class CloseFailureIO(MockRawIO):
138 closed = 0
139
140 def close(self):
141 if not self.closed:
142 self.closed = 1
143 raise IOError
144
145class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
146 pass
147
148class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
149 pass
150
151
152class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000153
154 def __init__(self, data):
155 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000157
158 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000159 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000160 self.read_history.append(None if res is None else len(res))
161 return res
162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 def readinto(self, b):
164 res = super().readinto(b)
165 self.read_history.append(res)
166 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000168class CMockFileIO(MockFileIO, io.BytesIO):
169 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000171class PyMockFileIO(MockFileIO, pyio.BytesIO):
172 pass
173
174
175class MockNonBlockWriterIO:
176
177 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000178 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def pop_written(self):
182 s = b"".join(self._write_stack)
183 self._write_stack[:] = []
184 return s
185
186 def block_on(self, char):
187 """Block when a given char is encountered."""
188 self._blocker_char = char
189
190 def readable(self):
191 return True
192
193 def seekable(self):
194 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Guido van Rossum01a27522007-03-07 01:00:12 +0000196 def writable(self):
197 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000199 def write(self, b):
200 b = bytes(b)
201 n = -1
202 if self._blocker_char:
203 try:
204 n = b.index(self._blocker_char)
205 except ValueError:
206 pass
207 else:
208 self._blocker_char = None
209 self._write_stack.append(b[:n])
210 raise self.BlockingIOError(0, "test blocking", n)
211 self._write_stack.append(b)
212 return len(b)
213
214class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
215 BlockingIOError = io.BlockingIOError
216
217class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
218 BlockingIOError = pyio.BlockingIOError
219
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Guido van Rossum28524c72007-02-27 05:47:44 +0000221class IOTest(unittest.TestCase):
222
Neal Norwitze7789b12008-03-24 06:18:09 +0000223 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000225
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000226 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228
Guido van Rossum28524c72007-02-27 05:47:44 +0000229 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000230 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000231 f.truncate(0)
232 self.assertEqual(f.tell(), 5)
233 f.seek(0)
234
235 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000236 self.assertEqual(f.seek(0), 0)
237 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000238 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000239 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000240 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000241 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000242 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000243 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000244 self.assertEqual(f.seek(-1, 2), 13)
245 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000246
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000248 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000249 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000250
Guido van Rossum9b76da62007-04-11 01:09:03 +0000251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000254 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000267 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000271 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
278
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 LARGE = 2**31
280
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000286 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000296 self.assertEqual(f.read(2), b"x")
297
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000301 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000304 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000307 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
310
Guido van Rossum28524c72007-02-27 05:47:44 +0000311 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000322
Guido van Rossum87429772007-04-10 21:06:59 +0000323 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000334
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000335 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000344 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000348
Guido van Rossum28524c72007-02-27 05:47:44 +0000349 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000350 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000354 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000356
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000374
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000379 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000380 self.assertEqual(f.closed, True)
381 f = None
382 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000384 1/0
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
389
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 # issue 5008
391 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000392 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000393 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000394 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000399 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000400
Guido van Rossum87429772007-04-10 21:06:59 +0000401 def test_destructor(self):
402 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000403 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000404 def __del__(self):
405 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000406 try:
407 f = super().__del__
408 except AttributeError:
409 pass
410 else:
411 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000412 def close(self):
413 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000415 def flush(self):
416 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 super().flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
420 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000421 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000422 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000423 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000424 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000425
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super().__del__
441 except AttributeError:
442 pass
443 else:
444 f()
445 def close(self):
446 record.append(self.on_close)
447 super().close()
448 def flush(self):
449 record.append(self.on_flush)
450 super().flush()
451 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000452 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000453 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000454 self.assertEqual(record, [1, 2, 3])
455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
458
459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
461
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
464
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
467
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000473
Guido van Rossumd4103952007-04-12 05:44:49 +0000474 def test_array_writes(self):
475 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000476 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000481
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000482 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000484 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
495
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000499
500 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000501 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000502 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000504 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000506 self.assertEqual(file.buffer.raw.closefd, False)
507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000517 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000518 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000520
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000525 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000527 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000529 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000530 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000531 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000532 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000534 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 self.assertRaises(OverflowError, f.read)
536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537class CIOTest(IOTest):
538 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540class PyIOTest(IOTest):
541 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000542
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544class CommonBufferedTests:
545 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
546
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000547 def test_detach(self):
548 raw = self.MockRawIO()
549 buf = self.tp(raw)
550 self.assertIs(buf.detach(), raw)
551 self.assertRaises(ValueError, buf.detach)
552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 def test_fileno(self):
554 rawio = self.MockRawIO()
555 bufio = self.tp(rawio)
556
557 self.assertEquals(42, bufio.fileno())
558
559 def test_no_fileno(self):
560 # XXX will we always have fileno() function? If so, kill
561 # this test. Else, write it.
562 pass
563
564 def test_invalid_args(self):
565 rawio = self.MockRawIO()
566 bufio = self.tp(rawio)
567 # Invalid whence
568 self.assertRaises(ValueError, bufio.seek, 0, -1)
569 self.assertRaises(ValueError, bufio.seek, 0, 3)
570
571 def test_override_destructor(self):
572 tp = self.tp
573 record = []
574 class MyBufferedIO(tp):
575 def __del__(self):
576 record.append(1)
577 try:
578 f = super().__del__
579 except AttributeError:
580 pass
581 else:
582 f()
583 def close(self):
584 record.append(2)
585 super().close()
586 def flush(self):
587 record.append(3)
588 super().flush()
589 rawio = self.MockRawIO()
590 bufio = MyBufferedIO(rawio)
591 writable = bufio.writable()
592 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000593 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 if writable:
595 self.assertEqual(record, [1, 2, 3])
596 else:
597 self.assertEqual(record, [1, 2])
598
599 def test_context_manager(self):
600 # Test usability as a context manager
601 rawio = self.MockRawIO()
602 bufio = self.tp(rawio)
603 def _with():
604 with bufio:
605 pass
606 _with()
607 # bufio should now be closed, and using it a second time should raise
608 # a ValueError.
609 self.assertRaises(ValueError, _with)
610
611 def test_error_through_destructor(self):
612 # Test that the exception state is not modified by a destructor,
613 # even if close() fails.
614 rawio = self.CloseFailureIO()
615 def f():
616 self.tp(rawio).xyzzy
617 with support.captured_output("stderr") as s:
618 self.assertRaises(AttributeError, f)
619 s = s.getvalue().strip()
620 if s:
621 # The destructor *may* have printed an unraisable error, check it
622 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000623 self.assertTrue(s.startswith("Exception IOError: "), s)
624 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000625
Antoine Pitrou716c4442009-05-23 19:04:03 +0000626 def test_repr(self):
627 raw = self.MockRawIO()
628 b = self.tp(raw)
629 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
630 self.assertEqual(repr(b), "<%s>" % clsname)
631 raw.name = "dummy"
632 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
633 raw.name = b"dummy"
634 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
635
Guido van Rossum78892e42007-04-06 17:31:18 +0000636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
638 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 def test_constructor(self):
641 rawio = self.MockRawIO([b"abc"])
642 bufio = self.tp(rawio)
643 bufio.__init__(rawio)
644 bufio.__init__(rawio, buffer_size=1024)
645 bufio.__init__(rawio, buffer_size=16)
646 self.assertEquals(b"abc", bufio.read())
647 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
648 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
650 rawio = self.MockRawIO([b"abc"])
651 bufio.__init__(rawio)
652 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000655 for arg in (None, 7):
656 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
657 bufio = self.tp(rawio)
658 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659 # Invalid args
660 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662 def test_read1(self):
663 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
664 bufio = self.tp(rawio)
665 self.assertEquals(b"a", bufio.read(1))
666 self.assertEquals(b"b", bufio.read1(1))
667 self.assertEquals(rawio._reads, 1)
668 self.assertEquals(b"c", bufio.read1(100))
669 self.assertEquals(rawio._reads, 1)
670 self.assertEquals(b"d", bufio.read1(100))
671 self.assertEquals(rawio._reads, 2)
672 self.assertEquals(b"efg", bufio.read1(100))
673 self.assertEquals(rawio._reads, 3)
674 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000675 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 # Invalid args
677 self.assertRaises(ValueError, bufio.read1, -1)
678
679 def test_readinto(self):
680 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
681 bufio = self.tp(rawio)
682 b = bytearray(2)
683 self.assertEquals(bufio.readinto(b), 2)
684 self.assertEquals(b, b"ab")
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"cd")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"ef")
689 self.assertEquals(bufio.readinto(b), 1)
690 self.assertEquals(b, b"gf")
691 self.assertEquals(bufio.readinto(b), 0)
692 self.assertEquals(b, b"gf")
693
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000694 def test_readlines(self):
695 def bufio():
696 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
697 return self.tp(rawio)
698 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
699 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
700 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000703 data = b"abcdefghi"
704 dlen = len(data)
705
706 tests = [
707 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
708 [ 100, [ 3, 3, 3], [ dlen ] ],
709 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
710 ]
711
712 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000713 rawio = self.MockFileIO(data)
714 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000715 pos = 0
716 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000717 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000718 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000720 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000723 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
725 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000726
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000727 self.assertEquals(b"abcd", bufio.read(6))
728 self.assertEquals(b"e", bufio.read(1))
729 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000731 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000732 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_read_past_eof(self):
735 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
736 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000737
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000738 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 def test_read_all(self):
741 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
742 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000743
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000744 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000747 try:
748 # Write out many bytes with exactly the same number of 0's,
749 # 1's... 255's. This will help us check that concurrent reading
750 # doesn't duplicate or forget contents.
751 N = 1000
752 l = list(range(256)) * N
753 random.shuffle(l)
754 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000755 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000756 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000757 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000759 errors = []
760 results = []
761 def f():
762 try:
763 # Intra-buffer read then buffer-flushing read
764 for n in cycle([1, 19]):
765 s = bufio.read(n)
766 if not s:
767 break
768 # list.append() is atomic
769 results.append(s)
770 except Exception as e:
771 errors.append(e)
772 raise
773 threads = [threading.Thread(target=f) for x in range(20)]
774 for t in threads:
775 t.start()
776 time.sleep(0.02) # yield
777 for t in threads:
778 t.join()
779 self.assertFalse(errors,
780 "the following exceptions were caught: %r" % errors)
781 s = b''.join(results)
782 for i in range(256):
783 c = bytes(bytearray([i]))
784 self.assertEqual(s.count(c), N)
785 finally:
786 support.unlink(support.TESTFN)
787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000788 def test_misbehaved_io(self):
789 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
790 bufio = self.tp(rawio)
791 self.assertRaises(IOError, bufio.seek, 0)
792 self.assertRaises(IOError, bufio.tell)
793
794class CBufferedReaderTest(BufferedReaderTest):
795 tp = io.BufferedReader
796
797 def test_constructor(self):
798 BufferedReaderTest.test_constructor(self)
799 # The allocation can succeed on 32-bit builds, e.g. with more
800 # than 2GB RAM and a 64-bit kernel.
801 if sys.maxsize > 0x7FFFFFFF:
802 rawio = self.MockRawIO()
803 bufio = self.tp(rawio)
804 self.assertRaises((OverflowError, MemoryError, ValueError),
805 bufio.__init__, rawio, sys.maxsize)
806
807 def test_initialization(self):
808 rawio = self.MockRawIO([b"abc"])
809 bufio = self.tp(rawio)
810 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
811 self.assertRaises(ValueError, bufio.read)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
813 self.assertRaises(ValueError, bufio.read)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
815 self.assertRaises(ValueError, bufio.read)
816
817 def test_misbehaved_io_read(self):
818 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
819 bufio = self.tp(rawio)
820 # _pyio.BufferedReader seems to implement reading different, so that
821 # checking this is not so easy.
822 self.assertRaises(IOError, bufio.read, 10)
823
824 def test_garbage_collection(self):
825 # C BufferedReader objects are collected.
826 # The Python version has __del__, so it ends into gc.garbage instead
827 rawio = self.FileIO(support.TESTFN, "w+b")
828 f = self.tp(rawio)
829 f.f = f
830 wr = weakref.ref(f)
831 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000832 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000833 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834
835class PyBufferedReaderTest(BufferedReaderTest):
836 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000837
Guido van Rossuma9e20242007-03-08 00:43:48 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
840 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_constructor(self):
843 rawio = self.MockRawIO()
844 bufio = self.tp(rawio)
845 bufio.__init__(rawio)
846 bufio.__init__(rawio, buffer_size=1024)
847 bufio.__init__(rawio, buffer_size=16)
848 self.assertEquals(3, bufio.write(b"abc"))
849 bufio.flush()
850 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
853 bufio.__init__(rawio)
854 self.assertEquals(3, bufio.write(b"ghi"))
855 bufio.flush()
856 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
857
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000858 def test_detach_flush(self):
859 raw = self.MockRawIO()
860 buf = self.tp(raw)
861 buf.write(b"howdy!")
862 self.assertFalse(raw._write_stack)
863 buf.detach()
864 self.assertEqual(raw._write_stack, [b"howdy!"])
865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000867 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 writer = self.MockRawIO()
869 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000870 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 def test_write_overflow(self):
874 writer = self.MockRawIO()
875 bufio = self.tp(writer, 8)
876 contents = b"abcdefghijklmnop"
877 for n in range(0, len(contents), 3):
878 bufio.write(contents[n:n+3])
879 flushed = b"".join(writer._write_stack)
880 # At least (total - 8) bytes were implicitly flushed, perhaps more
881 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000882 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 def check_writes(self, intermediate_func):
885 # Lots of writes, test the flushed output is as expected.
886 contents = bytes(range(256)) * 1000
887 n = 0
888 writer = self.MockRawIO()
889 bufio = self.tp(writer, 13)
890 # Generator of write sizes: repeat each N 15 times then proceed to N+1
891 def gen_sizes():
892 for size in count(1):
893 for i in range(15):
894 yield size
895 sizes = gen_sizes()
896 while n < len(contents):
897 size = min(next(sizes), len(contents) - n)
898 self.assertEquals(bufio.write(contents[n:n+size]), size)
899 intermediate_func(bufio)
900 n += size
901 bufio.flush()
902 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_writes(self):
905 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 def test_writes_and_flushes(self):
908 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_writes_and_seeks(self):
911 def _seekabs(bufio):
912 pos = bufio.tell()
913 bufio.seek(pos + 1, 0)
914 bufio.seek(pos - 1, 0)
915 bufio.seek(pos, 0)
916 self.check_writes(_seekabs)
917 def _seekrel(bufio):
918 pos = bufio.seek(0, 1)
919 bufio.seek(+1, 1)
920 bufio.seek(-1, 1)
921 bufio.seek(pos, 0)
922 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_writes_and_truncates(self):
925 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 def test_write_non_blocking(self):
928 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000929 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 self.assertEquals(bufio.write(b"abcd"), 4)
932 self.assertEquals(bufio.write(b"efghi"), 5)
933 # 1 byte will be written, the rest will be buffered
934 raw.block_on(b"k")
935 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 # 8 bytes will be written, 8 will be buffered and the rest will be lost
938 raw.block_on(b"0")
939 try:
940 bufio.write(b"opqrwxyz0123456789")
941 except self.BlockingIOError as e:
942 written = e.characters_written
943 else:
944 self.fail("BlockingIOError should have been raised")
945 self.assertEquals(written, 16)
946 self.assertEquals(raw.pop_written(),
947 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
950 s = raw.pop_written()
951 # Previously buffered bytes were flushed
952 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_write_and_rewind(self):
955 raw = io.BytesIO()
956 bufio = self.tp(raw, 4)
957 self.assertEqual(bufio.write(b"abcdef"), 6)
958 self.assertEqual(bufio.tell(), 6)
959 bufio.seek(0, 0)
960 self.assertEqual(bufio.write(b"XY"), 2)
961 bufio.seek(6, 0)
962 self.assertEqual(raw.getvalue(), b"XYcdef")
963 self.assertEqual(bufio.write(b"123456"), 6)
964 bufio.flush()
965 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 def test_flush(self):
968 writer = self.MockRawIO()
969 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000970 bufio.write(b"abc")
971 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000972 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_destructor(self):
975 writer = self.MockRawIO()
976 bufio = self.tp(writer, 8)
977 bufio.write(b"abc")
978 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000979 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000980 self.assertEquals(b"abc", writer._write_stack[0])
981
982 def test_truncate(self):
983 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000984 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 bufio = self.tp(raw, 8)
986 bufio.write(b"abcdef")
987 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000988 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000989 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990 self.assertEqual(f.read(), b"abc")
991
992 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000993 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994 # Write out many bytes from many threads and test they were
995 # all flushed.
996 N = 1000
997 contents = bytes(range(256)) * N
998 sizes = cycle([1, 19])
999 n = 0
1000 queue = deque()
1001 while n < len(contents):
1002 size = next(sizes)
1003 queue.append(contents[n:n+size])
1004 n += size
1005 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001006 # We use a real file object because it allows us to
1007 # exercise situations where the GIL is released before
1008 # writing the buffer to the raw streams. This is in addition
1009 # to concurrency issues due to switching threads in the middle
1010 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001011 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001013 errors = []
1014 def f():
1015 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 while True:
1017 try:
1018 s = queue.popleft()
1019 except IndexError:
1020 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001021 bufio.write(s)
1022 except Exception as e:
1023 errors.append(e)
1024 raise
1025 threads = [threading.Thread(target=f) for x in range(20)]
1026 for t in threads:
1027 t.start()
1028 time.sleep(0.02) # yield
1029 for t in threads:
1030 t.join()
1031 self.assertFalse(errors,
1032 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001034 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 s = f.read()
1036 for i in range(256):
1037 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001038 finally:
1039 support.unlink(support.TESTFN)
1040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 def test_misbehaved_io(self):
1042 rawio = self.MisbehavedRawIO()
1043 bufio = self.tp(rawio, 5)
1044 self.assertRaises(IOError, bufio.seek, 0)
1045 self.assertRaises(IOError, bufio.tell)
1046 self.assertRaises(IOError, bufio.write, b"abcdef")
1047
Benjamin Peterson59406a92009-03-26 17:10:29 +00001048 def test_max_buffer_size_deprecation(self):
1049 with support.check_warnings() as w:
1050 warnings.simplefilter("always", DeprecationWarning)
1051 self.tp(self.MockRawIO(), 8, 12)
1052 self.assertEqual(len(w.warnings), 1)
1053 warning = w.warnings[0]
1054 self.assertTrue(warning.category is DeprecationWarning)
1055 self.assertEqual(str(warning.message),
1056 "max_buffer_size is deprecated")
1057
1058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059class CBufferedWriterTest(BufferedWriterTest):
1060 tp = io.BufferedWriter
1061
1062 def test_constructor(self):
1063 BufferedWriterTest.test_constructor(self)
1064 # The allocation can succeed on 32-bit builds, e.g. with more
1065 # than 2GB RAM and a 64-bit kernel.
1066 if sys.maxsize > 0x7FFFFFFF:
1067 rawio = self.MockRawIO()
1068 bufio = self.tp(rawio)
1069 self.assertRaises((OverflowError, MemoryError, ValueError),
1070 bufio.__init__, rawio, sys.maxsize)
1071
1072 def test_initialization(self):
1073 rawio = self.MockRawIO()
1074 bufio = self.tp(rawio)
1075 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1076 self.assertRaises(ValueError, bufio.write, b"def")
1077 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1078 self.assertRaises(ValueError, bufio.write, b"def")
1079 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1080 self.assertRaises(ValueError, bufio.write, b"def")
1081
1082 def test_garbage_collection(self):
1083 # C BufferedWriter objects are collected, and collecting them flushes
1084 # all data to disk.
1085 # The Python version has __del__, so it ends into gc.garbage instead
1086 rawio = self.FileIO(support.TESTFN, "w+b")
1087 f = self.tp(rawio)
1088 f.write(b"123xxx")
1089 f.x = f
1090 wr = weakref.ref(f)
1091 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001092 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001093 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001094 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 self.assertEqual(f.read(), b"123xxx")
1096
1097
1098class PyBufferedWriterTest(BufferedWriterTest):
1099 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001100
Guido van Rossum01a27522007-03-07 01:00:12 +00001101class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001102
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001103 def test_constructor(self):
1104 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001105 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001106
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001107 def test_detach(self):
1108 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1109 self.assertRaises(self.UnsupportedOperation, pair.detach)
1110
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001111 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001112 with support.check_warnings() as w:
1113 warnings.simplefilter("always", DeprecationWarning)
1114 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1115 self.assertEqual(len(w.warnings), 1)
1116 warning = w.warnings[0]
1117 self.assertTrue(warning.category is DeprecationWarning)
1118 self.assertEqual(str(warning.message),
1119 "max_buffer_size is deprecated")
1120
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001121 def test_constructor_with_not_readable(self):
1122 class NotReadable(MockRawIO):
1123 def readable(self):
1124 return False
1125
1126 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1127
1128 def test_constructor_with_not_writeable(self):
1129 class NotWriteable(MockRawIO):
1130 def writable(self):
1131 return False
1132
1133 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1134
1135 def test_read(self):
1136 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1137
1138 self.assertEqual(pair.read(3), b"abc")
1139 self.assertEqual(pair.read(1), b"d")
1140 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001141 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1142 self.assertEqual(pair.read(None), b"abc")
1143
1144 def test_readlines(self):
1145 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1146 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1147 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1148 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001149
1150 def test_read1(self):
1151 # .read1() is delegated to the underlying reader object, so this test
1152 # can be shallow.
1153 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1154
1155 self.assertEqual(pair.read1(3), b"abc")
1156
1157 def test_readinto(self):
1158 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1159
1160 data = bytearray(5)
1161 self.assertEqual(pair.readinto(data), 5)
1162 self.assertEqual(data, b"abcde")
1163
1164 def test_write(self):
1165 w = self.MockRawIO()
1166 pair = self.tp(self.MockRawIO(), w)
1167
1168 pair.write(b"abc")
1169 pair.flush()
1170 pair.write(b"def")
1171 pair.flush()
1172 self.assertEqual(w._write_stack, [b"abc", b"def"])
1173
1174 def test_peek(self):
1175 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1176
1177 self.assertTrue(pair.peek(3).startswith(b"abc"))
1178 self.assertEqual(pair.read(3), b"abc")
1179
1180 def test_readable(self):
1181 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1182 self.assertTrue(pair.readable())
1183
1184 def test_writeable(self):
1185 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1186 self.assertTrue(pair.writable())
1187
1188 def test_seekable(self):
1189 # BufferedRWPairs are never seekable, even if their readers and writers
1190 # are.
1191 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1192 self.assertFalse(pair.seekable())
1193
1194 # .flush() is delegated to the underlying writer object and has been
1195 # tested in the test_write method.
1196
1197 def test_close_and_closed(self):
1198 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1199 self.assertFalse(pair.closed)
1200 pair.close()
1201 self.assertTrue(pair.closed)
1202
1203 def test_isatty(self):
1204 class SelectableIsAtty(MockRawIO):
1205 def __init__(self, isatty):
1206 MockRawIO.__init__(self)
1207 self._isatty = isatty
1208
1209 def isatty(self):
1210 return self._isatty
1211
1212 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1213 self.assertFalse(pair.isatty())
1214
1215 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1216 self.assertTrue(pair.isatty())
1217
1218 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1219 self.assertTrue(pair.isatty())
1220
1221 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1222 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224class CBufferedRWPairTest(BufferedRWPairTest):
1225 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227class PyBufferedRWPairTest(BufferedRWPairTest):
1228 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230
1231class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1232 read_mode = "rb+"
1233 write_mode = "wb+"
1234
1235 def test_constructor(self):
1236 BufferedReaderTest.test_constructor(self)
1237 BufferedWriterTest.test_constructor(self)
1238
1239 def test_read_and_write(self):
1240 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001241 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001242
1243 self.assertEqual(b"as", rw.read(2))
1244 rw.write(b"ddd")
1245 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001246 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001248 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_seek_and_tell(self):
1251 raw = self.BytesIO(b"asdfghjkl")
1252 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001253
1254 self.assertEquals(b"as", rw.read(2))
1255 self.assertEquals(2, rw.tell())
1256 rw.seek(0, 0)
1257 self.assertEquals(b"asdf", rw.read(4))
1258
1259 rw.write(b"asdf")
1260 rw.seek(0, 0)
1261 self.assertEquals(b"asdfasdfl", rw.read())
1262 self.assertEquals(9, rw.tell())
1263 rw.seek(-4, 2)
1264 self.assertEquals(5, rw.tell())
1265 rw.seek(2, 1)
1266 self.assertEquals(7, rw.tell())
1267 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001268 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def check_flush_and_read(self, read_func):
1271 raw = self.BytesIO(b"abcdefghi")
1272 bufio = self.tp(raw)
1273
1274 self.assertEquals(b"ab", read_func(bufio, 2))
1275 bufio.write(b"12")
1276 self.assertEquals(b"ef", read_func(bufio, 2))
1277 self.assertEquals(6, bufio.tell())
1278 bufio.flush()
1279 self.assertEquals(6, bufio.tell())
1280 self.assertEquals(b"ghi", read_func(bufio))
1281 raw.seek(0, 0)
1282 raw.write(b"XYZ")
1283 # flush() resets the read buffer
1284 bufio.flush()
1285 bufio.seek(0, 0)
1286 self.assertEquals(b"XYZ", read_func(bufio, 3))
1287
1288 def test_flush_and_read(self):
1289 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1290
1291 def test_flush_and_readinto(self):
1292 def _readinto(bufio, n=-1):
1293 b = bytearray(n if n >= 0 else 9999)
1294 n = bufio.readinto(b)
1295 return bytes(b[:n])
1296 self.check_flush_and_read(_readinto)
1297
1298 def test_flush_and_peek(self):
1299 def _peek(bufio, n=-1):
1300 # This relies on the fact that the buffer can contain the whole
1301 # raw stream, otherwise peek() can return less.
1302 b = bufio.peek(n)
1303 if n != -1:
1304 b = b[:n]
1305 bufio.seek(len(b), 1)
1306 return b
1307 self.check_flush_and_read(_peek)
1308
1309 def test_flush_and_write(self):
1310 raw = self.BytesIO(b"abcdefghi")
1311 bufio = self.tp(raw)
1312
1313 bufio.write(b"123")
1314 bufio.flush()
1315 bufio.write(b"45")
1316 bufio.flush()
1317 bufio.seek(0, 0)
1318 self.assertEquals(b"12345fghi", raw.getvalue())
1319 self.assertEquals(b"12345fghi", bufio.read())
1320
1321 def test_threads(self):
1322 BufferedReaderTest.test_threads(self)
1323 BufferedWriterTest.test_threads(self)
1324
1325 def test_writes_and_peek(self):
1326 def _peek(bufio):
1327 bufio.peek(1)
1328 self.check_writes(_peek)
1329 def _peek(bufio):
1330 pos = bufio.tell()
1331 bufio.seek(-1, 1)
1332 bufio.peek(1)
1333 bufio.seek(pos, 0)
1334 self.check_writes(_peek)
1335
1336 def test_writes_and_reads(self):
1337 def _read(bufio):
1338 bufio.seek(-1, 1)
1339 bufio.read(1)
1340 self.check_writes(_read)
1341
1342 def test_writes_and_read1s(self):
1343 def _read1(bufio):
1344 bufio.seek(-1, 1)
1345 bufio.read1(1)
1346 self.check_writes(_read1)
1347
1348 def test_writes_and_readintos(self):
1349 def _read(bufio):
1350 bufio.seek(-1, 1)
1351 bufio.readinto(bytearray(1))
1352 self.check_writes(_read)
1353
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001354 def test_write_after_readahead(self):
1355 # Issue #6629: writing after the buffer was filled by readahead should
1356 # first rewind the raw stream.
1357 for overwrite_size in [1, 5]:
1358 raw = self.BytesIO(b"A" * 10)
1359 bufio = self.tp(raw, 4)
1360 # Trigger readahead
1361 self.assertEqual(bufio.read(1), b"A")
1362 self.assertEqual(bufio.tell(), 1)
1363 # Overwriting should rewind the raw stream if it needs so
1364 bufio.write(b"B" * overwrite_size)
1365 self.assertEqual(bufio.tell(), overwrite_size + 1)
1366 # If the write size was smaller than the buffer size, flush() and
1367 # check that rewind happens.
1368 bufio.flush()
1369 self.assertEqual(bufio.tell(), overwrite_size + 1)
1370 s = raw.getvalue()
1371 self.assertEqual(s,
1372 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1373
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001374 def test_truncate_after_read_or_write(self):
1375 raw = self.BytesIO(b"A" * 10)
1376 bufio = self.tp(raw, 100)
1377 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1378 self.assertEqual(bufio.truncate(), 2)
1379 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1380 self.assertEqual(bufio.truncate(), 4)
1381
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001382 def test_misbehaved_io(self):
1383 BufferedReaderTest.test_misbehaved_io(self)
1384 BufferedWriterTest.test_misbehaved_io(self)
1385
1386class CBufferedRandomTest(BufferedRandomTest):
1387 tp = io.BufferedRandom
1388
1389 def test_constructor(self):
1390 BufferedRandomTest.test_constructor(self)
1391 # The allocation can succeed on 32-bit builds, e.g. with more
1392 # than 2GB RAM and a 64-bit kernel.
1393 if sys.maxsize > 0x7FFFFFFF:
1394 rawio = self.MockRawIO()
1395 bufio = self.tp(rawio)
1396 self.assertRaises((OverflowError, MemoryError, ValueError),
1397 bufio.__init__, rawio, sys.maxsize)
1398
1399 def test_garbage_collection(self):
1400 CBufferedReaderTest.test_garbage_collection(self)
1401 CBufferedWriterTest.test_garbage_collection(self)
1402
1403class PyBufferedRandomTest(BufferedRandomTest):
1404 tp = pyio.BufferedRandom
1405
1406
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001407# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1408# properties:
1409# - A single output character can correspond to many bytes of input.
1410# - The number of input bytes to complete the character can be
1411# undetermined until the last input byte is received.
1412# - The number of input bytes can vary depending on previous input.
1413# - A single input byte can correspond to many characters of output.
1414# - The number of output characters can be undetermined until the
1415# last input byte is received.
1416# - The number of output characters can vary depending on previous input.
1417
1418class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1419 """
1420 For testing seek/tell behavior with a stateful, buffering decoder.
1421
1422 Input is a sequence of words. Words may be fixed-length (length set
1423 by input) or variable-length (period-terminated). In variable-length
1424 mode, extra periods are ignored. Possible words are:
1425 - 'i' followed by a number sets the input length, I (maximum 99).
1426 When I is set to 0, words are space-terminated.
1427 - 'o' followed by a number sets the output length, O (maximum 99).
1428 - Any other word is converted into a word followed by a period on
1429 the output. The output word consists of the input word truncated
1430 or padded out with hyphens to make its length equal to O. If O
1431 is 0, the word is output verbatim without truncating or padding.
1432 I and O are initially set to 1. When I changes, any buffered input is
1433 re-scanned according to the new I. EOF also terminates the last word.
1434 """
1435
1436 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001437 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001438 self.reset()
1439
1440 def __repr__(self):
1441 return '<SID %x>' % id(self)
1442
1443 def reset(self):
1444 self.i = 1
1445 self.o = 1
1446 self.buffer = bytearray()
1447
1448 def getstate(self):
1449 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1450 return bytes(self.buffer), i*100 + o
1451
1452 def setstate(self, state):
1453 buffer, io = state
1454 self.buffer = bytearray(buffer)
1455 i, o = divmod(io, 100)
1456 self.i, self.o = i ^ 1, o ^ 1
1457
1458 def decode(self, input, final=False):
1459 output = ''
1460 for b in input:
1461 if self.i == 0: # variable-length, terminated with period
1462 if b == ord('.'):
1463 if self.buffer:
1464 output += self.process_word()
1465 else:
1466 self.buffer.append(b)
1467 else: # fixed-length, terminate after self.i bytes
1468 self.buffer.append(b)
1469 if len(self.buffer) == self.i:
1470 output += self.process_word()
1471 if final and self.buffer: # EOF terminates the last word
1472 output += self.process_word()
1473 return output
1474
1475 def process_word(self):
1476 output = ''
1477 if self.buffer[0] == ord('i'):
1478 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1479 elif self.buffer[0] == ord('o'):
1480 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1481 else:
1482 output = self.buffer.decode('ascii')
1483 if len(output) < self.o:
1484 output += '-'*self.o # pad out with hyphens
1485 if self.o:
1486 output = output[:self.o] # truncate to output length
1487 output += '.'
1488 self.buffer = bytearray()
1489 return output
1490
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001491 codecEnabled = False
1492
1493 @classmethod
1494 def lookupTestDecoder(cls, name):
1495 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001496 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001497 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001498 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001499 incrementalencoder=None,
1500 streamreader=None, streamwriter=None,
1501 incrementaldecoder=cls)
1502
1503# Register the previous decoder for testing.
1504# Disabled by default, tests will enable it.
1505codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1506
1507
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001508class StatefulIncrementalDecoderTest(unittest.TestCase):
1509 """
1510 Make sure the StatefulIncrementalDecoder actually works.
1511 """
1512
1513 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001514 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001515 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001516 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001517 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001518 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001519 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001520 # I=0, O=6 (variable-length input, fixed-length output)
1521 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1522 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001523 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001524 # I=6, O=3 (fixed-length input > fixed-length output)
1525 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1526 # I=0, then 3; O=29, then 15 (with longer output)
1527 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1528 'a----------------------------.' +
1529 'b----------------------------.' +
1530 'cde--------------------------.' +
1531 'abcdefghijabcde.' +
1532 'a.b------------.' +
1533 '.c.------------.' +
1534 'd.e------------.' +
1535 'k--------------.' +
1536 'l--------------.' +
1537 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001538 ]
1539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001541 # Try a few one-shot test cases.
1542 for input, eof, output in self.test_cases:
1543 d = StatefulIncrementalDecoder()
1544 self.assertEquals(d.decode(input, eof), output)
1545
1546 # Also test an unfinished decode, followed by forcing EOF.
1547 d = StatefulIncrementalDecoder()
1548 self.assertEquals(d.decode(b'oiabcd'), '')
1549 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001550
1551class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001552
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001553 def setUp(self):
1554 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1555 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001556 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001557
Guido van Rossumd0712812007-04-11 16:32:43 +00001558 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001559 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001560
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001561 def test_constructor(self):
1562 r = self.BytesIO(b"\xc3\xa9\n\n")
1563 b = self.BufferedReader(r, 1000)
1564 t = self.TextIOWrapper(b)
1565 t.__init__(b, encoding="latin1", newline="\r\n")
1566 self.assertEquals(t.encoding, "latin1")
1567 self.assertEquals(t.line_buffering, False)
1568 t.__init__(b, encoding="utf8", line_buffering=True)
1569 self.assertEquals(t.encoding, "utf8")
1570 self.assertEquals(t.line_buffering, True)
1571 self.assertEquals("\xe9\n", t.readline())
1572 self.assertRaises(TypeError, t.__init__, b, newline=42)
1573 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1574
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001575 def test_detach(self):
1576 r = self.BytesIO()
1577 b = self.BufferedWriter(r)
1578 t = self.TextIOWrapper(b)
1579 self.assertIs(t.detach(), b)
1580
1581 t = self.TextIOWrapper(b, encoding="ascii")
1582 t.write("howdy")
1583 self.assertFalse(r.getvalue())
1584 t.detach()
1585 self.assertEqual(r.getvalue(), b"howdy")
1586 self.assertRaises(ValueError, t.detach)
1587
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001588 def test_repr(self):
1589 raw = self.BytesIO("hello".encode("utf-8"))
1590 b = self.BufferedReader(raw)
1591 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001592 modname = self.TextIOWrapper.__module__
1593 self.assertEqual(repr(t),
1594 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1595 raw.name = "dummy"
1596 self.assertEqual(repr(t),
1597 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1598 raw.name = b"dummy"
1599 self.assertEqual(repr(t),
1600 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001601
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 def test_line_buffering(self):
1603 r = self.BytesIO()
1604 b = self.BufferedWriter(r, 1000)
1605 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001606 t.write("X")
1607 self.assertEquals(r.getvalue(), b"") # No flush happened
1608 t.write("Y\nZ")
1609 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1610 t.write("A\rB")
1611 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 def test_encoding(self):
1614 # Check the encoding attribute is always set, and valid
1615 b = self.BytesIO()
1616 t = self.TextIOWrapper(b, encoding="utf8")
1617 self.assertEqual(t.encoding, "utf8")
1618 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001619 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001620 codecs.lookup(t.encoding)
1621
1622 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001623 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 b = self.BytesIO(b"abc\n\xff\n")
1625 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001626 self.assertRaises(UnicodeError, t.read)
1627 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001628 b = self.BytesIO(b"abc\n\xff\n")
1629 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001630 self.assertRaises(UnicodeError, t.read)
1631 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 b = self.BytesIO(b"abc\n\xff\n")
1633 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001634 self.assertEquals(t.read(), "abc\n\n")
1635 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636 b = self.BytesIO(b"abc\n\xff\n")
1637 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001638 self.assertEquals(t.read(), "abc\n\ufffd\n")
1639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001641 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 b = self.BytesIO()
1643 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001644 self.assertRaises(UnicodeError, t.write, "\xff")
1645 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 b = self.BytesIO()
1647 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001648 self.assertRaises(UnicodeError, t.write, "\xff")
1649 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 b = self.BytesIO()
1651 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001652 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001653 t.write("abc\xffdef\n")
1654 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001655 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001656 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 b = self.BytesIO()
1658 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001659 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001660 t.write("abc\xffdef\n")
1661 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001662 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001665 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1666
1667 tests = [
1668 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001669 [ '', input_lines ],
1670 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1671 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1672 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001673 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001674 encodings = (
1675 'utf-8', 'latin-1',
1676 'utf-16', 'utf-16-le', 'utf-16-be',
1677 'utf-32', 'utf-32-le', 'utf-32-be',
1678 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001679
Guido van Rossum8358db22007-08-18 21:39:55 +00001680 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001681 # character in TextIOWrapper._pending_line.
1682 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001683 # XXX: str.encode() should return bytes
1684 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001685 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001686 for bufsize in range(1, 10):
1687 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1689 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001690 encoding=encoding)
1691 if do_reads:
1692 got_lines = []
1693 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001694 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001695 if c2 == '':
1696 break
1697 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001698 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001699 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001700 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001701
1702 for got_line, exp_line in zip(got_lines, exp_lines):
1703 self.assertEquals(got_line, exp_line)
1704 self.assertEquals(len(got_lines), len(exp_lines))
1705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 def test_newlines_input(self):
1707 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001708 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1709 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001710 (None, normalized.decode("ascii").splitlines(True)),
1711 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1713 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1714 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001715 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001716 buf = self.BytesIO(testdata)
1717 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001718 self.assertEquals(txt.readlines(), expected)
1719 txt.seek(0)
1720 self.assertEquals(txt.read(), "".join(expected))
1721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 def test_newlines_output(self):
1723 testdict = {
1724 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1725 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1726 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1727 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1728 }
1729 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1730 for newline, expected in tests:
1731 buf = self.BytesIO()
1732 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1733 txt.write("AAA\nB")
1734 txt.write("BB\nCCC\n")
1735 txt.write("X\rY\r\nZ")
1736 txt.flush()
1737 self.assertEquals(buf.closed, False)
1738 self.assertEquals(buf.getvalue(), expected)
1739
1740 def test_destructor(self):
1741 l = []
1742 base = self.BytesIO
1743 class MyBytesIO(base):
1744 def close(self):
1745 l.append(self.getvalue())
1746 base.close(self)
1747 b = MyBytesIO()
1748 t = self.TextIOWrapper(b, encoding="ascii")
1749 t.write("abc")
1750 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001751 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001752 self.assertEquals([b"abc"], l)
1753
1754 def test_override_destructor(self):
1755 record = []
1756 class MyTextIO(self.TextIOWrapper):
1757 def __del__(self):
1758 record.append(1)
1759 try:
1760 f = super().__del__
1761 except AttributeError:
1762 pass
1763 else:
1764 f()
1765 def close(self):
1766 record.append(2)
1767 super().close()
1768 def flush(self):
1769 record.append(3)
1770 super().flush()
1771 b = self.BytesIO()
1772 t = MyTextIO(b, encoding="ascii")
1773 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001774 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 self.assertEqual(record, [1, 2, 3])
1776
1777 def test_error_through_destructor(self):
1778 # Test that the exception state is not modified by a destructor,
1779 # even if close() fails.
1780 rawio = self.CloseFailureIO()
1781 def f():
1782 self.TextIOWrapper(rawio).xyzzy
1783 with support.captured_output("stderr") as s:
1784 self.assertRaises(AttributeError, f)
1785 s = s.getvalue().strip()
1786 if s:
1787 # The destructor *may* have printed an unraisable error, check it
1788 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001789 self.assertTrue(s.startswith("Exception IOError: "), s)
1790 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001791
Guido van Rossum9b76da62007-04-11 01:09:03 +00001792 # Systematic tests of the text I/O API
1793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001795 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1796 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001798 f._CHUNK_SIZE = chunksize
1799 self.assertEquals(f.write("abc"), 3)
1800 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001802 f._CHUNK_SIZE = chunksize
1803 self.assertEquals(f.tell(), 0)
1804 self.assertEquals(f.read(), "abc")
1805 cookie = f.tell()
1806 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001807 self.assertEquals(f.read(None), "abc")
1808 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001809 self.assertEquals(f.read(2), "ab")
1810 self.assertEquals(f.read(1), "c")
1811 self.assertEquals(f.read(1), "")
1812 self.assertEquals(f.read(), "")
1813 self.assertEquals(f.tell(), cookie)
1814 self.assertEquals(f.seek(0), 0)
1815 self.assertEquals(f.seek(0, 2), cookie)
1816 self.assertEquals(f.write("def"), 3)
1817 self.assertEquals(f.seek(cookie), cookie)
1818 self.assertEquals(f.read(), "def")
1819 if enc.startswith("utf"):
1820 self.multi_line_test(f, enc)
1821 f.close()
1822
1823 def multi_line_test(self, f, enc):
1824 f.seek(0)
1825 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001826 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001827 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001828 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001829 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001830 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001831 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001832 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001833 wlines.append((f.tell(), line))
1834 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001835 f.seek(0)
1836 rlines = []
1837 while True:
1838 pos = f.tell()
1839 line = f.readline()
1840 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001841 break
1842 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001843 self.assertEquals(rlines, wlines)
1844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 def test_telling(self):
1846 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001847 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001848 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001849 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001850 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001851 p2 = f.tell()
1852 f.seek(0)
1853 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001854 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001855 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001856 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001857 self.assertEquals(f.tell(), p2)
1858 f.seek(0)
1859 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001860 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001861 self.assertRaises(IOError, f.tell)
1862 self.assertEquals(f.tell(), p2)
1863 f.close()
1864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 def test_seeking(self):
1866 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001867 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001868 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001869 prefix = bytes(u_prefix.encode("utf-8"))
1870 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001871 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001872 suffix = bytes(u_suffix.encode("utf-8"))
1873 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001875 f.write(line*2)
1876 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001878 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001879 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001880 self.assertEquals(f.tell(), prefix_size)
1881 self.assertEquals(f.readline(), u_suffix)
1882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001884 # Regression test for a specific bug
1885 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001887 f.write(data)
1888 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001890 f._CHUNK_SIZE # Just test that it exists
1891 f._CHUNK_SIZE = 2
1892 f.readline()
1893 f.tell()
1894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 def test_seek_and_tell(self):
1896 #Test seek/tell using the StatefulIncrementalDecoder.
1897 # Make test faster by doing smaller seeks
1898 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001899
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001900 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001901 """Tell/seek to various points within a data stream and ensure
1902 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001904 f.write(data)
1905 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 f = self.open(support.TESTFN, encoding='test_decoder')
1907 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001908 decoded = f.read()
1909 f.close()
1910
Neal Norwitze2b07052008-03-18 19:52:05 +00001911 for i in range(min_pos, len(decoded) + 1): # seek positions
1912 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001914 self.assertEquals(f.read(i), decoded[:i])
1915 cookie = f.tell()
1916 self.assertEquals(f.read(j), decoded[i:i + j])
1917 f.seek(cookie)
1918 self.assertEquals(f.read(), decoded[i:])
1919 f.close()
1920
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001921 # Enable the test decoder.
1922 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001923
1924 # Run the tests.
1925 try:
1926 # Try each test case.
1927 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001928 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001929
1930 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001931 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1932 offset = CHUNK_SIZE - len(input)//2
1933 prefix = b'.'*offset
1934 # Don't bother seeking into the prefix (takes too long).
1935 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001936 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001937
1938 # Ensure our test decoder won't interfere with subsequent tests.
1939 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001940 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001942 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001943 data = "1234567890"
1944 tests = ("utf-16",
1945 "utf-16-le",
1946 "utf-16-be",
1947 "utf-32",
1948 "utf-32-le",
1949 "utf-32-be")
1950 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 buf = self.BytesIO()
1952 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001953 # Check if the BOM is written only once (see issue1753).
1954 f.write(data)
1955 f.write(data)
1956 f.seek(0)
1957 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001958 f.seek(0)
1959 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001960 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1961
Benjamin Petersona1b49012009-03-31 23:11:32 +00001962 def test_unreadable(self):
1963 class UnReadable(self.BytesIO):
1964 def readable(self):
1965 return False
1966 txt = self.TextIOWrapper(UnReadable())
1967 self.assertRaises(IOError, txt.read)
1968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969 def test_read_one_by_one(self):
1970 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001971 reads = ""
1972 while True:
1973 c = txt.read(1)
1974 if not c:
1975 break
1976 reads += c
1977 self.assertEquals(reads, "AA\nBB")
1978
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001979 def test_readlines(self):
1980 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1981 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1982 txt.seek(0)
1983 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1984 txt.seek(0)
1985 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1986
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001987 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001989 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001991 reads = ""
1992 while True:
1993 c = txt.read(128)
1994 if not c:
1995 break
1996 reads += c
1997 self.assertEquals(reads, "A"*127+"\nB")
1998
1999 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002001
2002 # read one char at a time
2003 reads = ""
2004 while True:
2005 c = txt.read(1)
2006 if not c:
2007 break
2008 reads += c
2009 self.assertEquals(reads, self.normalized)
2010
2011 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002013 txt._CHUNK_SIZE = 4
2014
2015 reads = ""
2016 while True:
2017 c = txt.read(4)
2018 if not c:
2019 break
2020 reads += c
2021 self.assertEquals(reads, self.normalized)
2022
2023 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002025 txt._CHUNK_SIZE = 4
2026
2027 reads = txt.read(4)
2028 reads += txt.read(4)
2029 reads += txt.readline()
2030 reads += txt.readline()
2031 reads += txt.readline()
2032 self.assertEquals(reads, self.normalized)
2033
2034 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002036 txt._CHUNK_SIZE = 4
2037
2038 reads = txt.read(4)
2039 reads += txt.read()
2040 self.assertEquals(reads, self.normalized)
2041
2042 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044 txt._CHUNK_SIZE = 4
2045
2046 reads = txt.read(4)
2047 pos = txt.tell()
2048 txt.seek(0)
2049 txt.seek(pos)
2050 self.assertEquals(txt.read(4), "BBB\n")
2051
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002052 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 buffer = self.BytesIO(self.testdata)
2054 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002055
2056 self.assertEqual(buffer.seekable(), txt.seekable())
2057
Antoine Pitroue4501852009-05-14 18:55:55 +00002058 def test_append_bom(self):
2059 # The BOM is not written again when appending to a non-empty file
2060 filename = support.TESTFN
2061 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2062 with self.open(filename, 'w', encoding=charset) as f:
2063 f.write('aaa')
2064 pos = f.tell()
2065 with self.open(filename, 'rb') as f:
2066 self.assertEquals(f.read(), 'aaa'.encode(charset))
2067
2068 with self.open(filename, 'a', encoding=charset) as f:
2069 f.write('xxx')
2070 with self.open(filename, 'rb') as f:
2071 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2072
2073 def test_seek_bom(self):
2074 # Same test, but when seeking manually
2075 filename = support.TESTFN
2076 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2077 with self.open(filename, 'w', encoding=charset) as f:
2078 f.write('aaa')
2079 pos = f.tell()
2080 with self.open(filename, 'r+', encoding=charset) as f:
2081 f.seek(pos)
2082 f.write('zzz')
2083 f.seek(0)
2084 f.write('bbb')
2085 with self.open(filename, 'rb') as f:
2086 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2087
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002088 def test_errors_property(self):
2089 with self.open(support.TESTFN, "w") as f:
2090 self.assertEqual(f.errors, "strict")
2091 with self.open(support.TESTFN, "w", errors="replace") as f:
2092 self.assertEqual(f.errors, "replace")
2093
Antoine Pitroue4501852009-05-14 18:55:55 +00002094
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002095 def test_threads_write(self):
2096 # Issue6750: concurrent writes could duplicate data
2097 event = threading.Event()
2098 with self.open(support.TESTFN, "w", buffering=1) as f:
2099 def run(n):
2100 text = "Thread%03d\n" % n
2101 event.wait()
2102 f.write(text)
2103 threads = [threading.Thread(target=lambda n=x: run(n))
2104 for x in range(20)]
2105 for t in threads:
2106 t.start()
2107 time.sleep(0.02)
2108 event.set()
2109 for t in threads:
2110 t.join()
2111 with self.open(support.TESTFN) as f:
2112 content = f.read()
2113 for n in range(20):
2114 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116class CTextIOWrapperTest(TextIOWrapperTest):
2117
2118 def test_initialization(self):
2119 r = self.BytesIO(b"\xc3\xa9\n\n")
2120 b = self.BufferedReader(r, 1000)
2121 t = self.TextIOWrapper(b)
2122 self.assertRaises(TypeError, t.__init__, b, newline=42)
2123 self.assertRaises(ValueError, t.read)
2124 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2125 self.assertRaises(ValueError, t.read)
2126
2127 def test_garbage_collection(self):
2128 # C TextIOWrapper objects are collected, and collecting them flushes
2129 # all data to disk.
2130 # The Python version has __del__, so it ends in gc.garbage instead.
2131 rawio = io.FileIO(support.TESTFN, "wb")
2132 b = self.BufferedWriter(rawio)
2133 t = self.TextIOWrapper(b, encoding="ascii")
2134 t.write("456def")
2135 t.x = t
2136 wr = weakref.ref(t)
2137 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002138 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002139 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002140 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141 self.assertEqual(f.read(), b"456def")
2142
2143class PyTextIOWrapperTest(TextIOWrapperTest):
2144 pass
2145
2146
2147class IncrementalNewlineDecoderTest(unittest.TestCase):
2148
2149 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002150 # UTF-8 specific tests for a newline decoder
2151 def _check_decode(b, s, **kwargs):
2152 # We exercise getstate() / setstate() as well as decode()
2153 state = decoder.getstate()
2154 self.assertEquals(decoder.decode(b, **kwargs), s)
2155 decoder.setstate(state)
2156 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002157
Antoine Pitrou180a3362008-12-14 16:36:46 +00002158 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002159
Antoine Pitrou180a3362008-12-14 16:36:46 +00002160 _check_decode(b'\xe8', "")
2161 _check_decode(b'\xa2', "")
2162 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002163
Antoine Pitrou180a3362008-12-14 16:36:46 +00002164 _check_decode(b'\xe8', "")
2165 _check_decode(b'\xa2', "")
2166 _check_decode(b'\x88', "\u8888")
2167
2168 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002169 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2170
Antoine Pitrou180a3362008-12-14 16:36:46 +00002171 decoder.reset()
2172 _check_decode(b'\n', "\n")
2173 _check_decode(b'\r', "")
2174 _check_decode(b'', "\n", final=True)
2175 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002176
Antoine Pitrou180a3362008-12-14 16:36:46 +00002177 _check_decode(b'\r', "")
2178 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002179
Antoine Pitrou180a3362008-12-14 16:36:46 +00002180 _check_decode(b'\r\r\n', "\n\n")
2181 _check_decode(b'\r', "")
2182 _check_decode(b'\r', "\n")
2183 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002184
Antoine Pitrou180a3362008-12-14 16:36:46 +00002185 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2186 _check_decode(b'\xe8\xa2\x88', "\u8888")
2187 _check_decode(b'\n', "\n")
2188 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2189 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002192 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 if encoding is not None:
2194 encoder = codecs.getincrementalencoder(encoding)()
2195 def _decode_bytewise(s):
2196 # Decode one byte at a time
2197 for b in encoder.encode(s):
2198 result.append(decoder.decode(bytes([b])))
2199 else:
2200 encoder = None
2201 def _decode_bytewise(s):
2202 # Decode one char at a time
2203 for c in s:
2204 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002205 self.assertEquals(decoder.newlines, None)
2206 _decode_bytewise("abc\n\r")
2207 self.assertEquals(decoder.newlines, '\n')
2208 _decode_bytewise("\nabc")
2209 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2210 _decode_bytewise("abc\r")
2211 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2212 _decode_bytewise("abc")
2213 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2214 _decode_bytewise("abc\r")
2215 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2216 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 input = "abc"
2218 if encoder is not None:
2219 encoder.reset()
2220 input = encoder.encode(input)
2221 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002222 self.assertEquals(decoder.newlines, None)
2223
2224 def test_newline_decoder(self):
2225 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 # None meaning the IncrementalNewlineDecoder takes unicode input
2227 # rather than bytes input
2228 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002229 'utf-16', 'utf-16-le', 'utf-16-be',
2230 'utf-32', 'utf-32-le', 'utf-32-be',
2231 )
2232 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 decoder = enc and codecs.getincrementaldecoder(enc)()
2234 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2235 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002236 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2238 self.check_newline_decoding_utf8(decoder)
2239
Antoine Pitrou66913e22009-03-06 23:40:56 +00002240 def test_newline_bytes(self):
2241 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2242 def _check(dec):
2243 self.assertEquals(dec.newlines, None)
2244 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2245 self.assertEquals(dec.newlines, None)
2246 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2247 self.assertEquals(dec.newlines, None)
2248 dec = self.IncrementalNewlineDecoder(None, translate=False)
2249 _check(dec)
2250 dec = self.IncrementalNewlineDecoder(None, translate=True)
2251 _check(dec)
2252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2254 pass
2255
2256class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2257 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002258
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002259
Guido van Rossum01a27522007-03-07 01:00:12 +00002260# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002261
Guido van Rossum5abbf752007-08-27 17:39:33 +00002262class MiscIOTest(unittest.TestCase):
2263
Barry Warsaw40e82462008-11-20 20:14:50 +00002264 def tearDown(self):
2265 support.unlink(support.TESTFN)
2266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267 def test___all__(self):
2268 for name in self.io.__all__:
2269 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002270 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002271 if name == "open":
2272 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002273 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002274 self.assertTrue(issubclass(obj, Exception), name)
2275 elif not name.startswith("SEEK_"):
2276 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002277
Barry Warsaw40e82462008-11-20 20:14:50 +00002278 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002280 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002281 f.close()
2282
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002283 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002284 self.assertEquals(f.name, support.TESTFN)
2285 self.assertEquals(f.buffer.name, support.TESTFN)
2286 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2287 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002288 self.assertEquals(f.buffer.mode, "rb")
2289 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002290 f.close()
2291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002292 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002293 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002294 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2295 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002298 self.assertEquals(g.mode, "wb")
2299 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002300 self.assertEquals(g.name, f.fileno())
2301 self.assertEquals(g.raw.name, f.fileno())
2302 f.close()
2303 g.close()
2304
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002305 def test_io_after_close(self):
2306 for kwargs in [
2307 {"mode": "w"},
2308 {"mode": "wb"},
2309 {"mode": "w", "buffering": 1},
2310 {"mode": "w", "buffering": 2},
2311 {"mode": "wb", "buffering": 0},
2312 {"mode": "r"},
2313 {"mode": "rb"},
2314 {"mode": "r", "buffering": 1},
2315 {"mode": "r", "buffering": 2},
2316 {"mode": "rb", "buffering": 0},
2317 {"mode": "w+"},
2318 {"mode": "w+b"},
2319 {"mode": "w+", "buffering": 1},
2320 {"mode": "w+", "buffering": 2},
2321 {"mode": "w+b", "buffering": 0},
2322 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002324 f.close()
2325 self.assertRaises(ValueError, f.flush)
2326 self.assertRaises(ValueError, f.fileno)
2327 self.assertRaises(ValueError, f.isatty)
2328 self.assertRaises(ValueError, f.__iter__)
2329 if hasattr(f, "peek"):
2330 self.assertRaises(ValueError, f.peek, 1)
2331 self.assertRaises(ValueError, f.read)
2332 if hasattr(f, "read1"):
2333 self.assertRaises(ValueError, f.read1, 1024)
2334 if hasattr(f, "readinto"):
2335 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2336 self.assertRaises(ValueError, f.readline)
2337 self.assertRaises(ValueError, f.readlines)
2338 self.assertRaises(ValueError, f.seek, 0)
2339 self.assertRaises(ValueError, f.tell)
2340 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002341 self.assertRaises(ValueError, f.write,
2342 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002343 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002344 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002346 def test_blockingioerror(self):
2347 # Various BlockingIOError issues
2348 self.assertRaises(TypeError, self.BlockingIOError)
2349 self.assertRaises(TypeError, self.BlockingIOError, 1)
2350 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2351 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2352 b = self.BlockingIOError(1, "")
2353 self.assertEqual(b.characters_written, 0)
2354 class C(str):
2355 pass
2356 c = C("")
2357 b = self.BlockingIOError(1, c)
2358 c.b = b
2359 b.c = c
2360 wr = weakref.ref(c)
2361 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002362 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002363 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364
2365 def test_abcs(self):
2366 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002367 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2368 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2369 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2370 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371
2372 def _check_abc_inheritance(self, abcmodule):
2373 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002374 self.assertIsInstance(f, abcmodule.IOBase)
2375 self.assertIsInstance(f, abcmodule.RawIOBase)
2376 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2377 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002379 self.assertIsInstance(f, abcmodule.IOBase)
2380 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2381 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2382 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002384 self.assertIsInstance(f, abcmodule.IOBase)
2385 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2386 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2387 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002388
2389 def test_abc_inheritance(self):
2390 # Test implementations inherit from their respective ABCs
2391 self._check_abc_inheritance(self)
2392
2393 def test_abc_inheritance_official(self):
2394 # Test implementations inherit from the official ABCs of the
2395 # baseline "io" module.
2396 self._check_abc_inheritance(io)
2397
2398class CMiscIOTest(MiscIOTest):
2399 io = io
2400
2401class PyMiscIOTest(MiscIOTest):
2402 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002403
Guido van Rossum28524c72007-02-27 05:47:44 +00002404def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002405 tests = (CIOTest, PyIOTest,
2406 CBufferedReaderTest, PyBufferedReaderTest,
2407 CBufferedWriterTest, PyBufferedWriterTest,
2408 CBufferedRWPairTest, PyBufferedRWPairTest,
2409 CBufferedRandomTest, PyBufferedRandomTest,
2410 StatefulIncrementalDecoderTest,
2411 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2412 CTextIOWrapperTest, PyTextIOWrapperTest,
2413 CMiscIOTest, PyMiscIOTest,)
2414
2415 # Put the namespaces of the IO module we are testing and some useful mock
2416 # classes in the __dict__ of each test.
2417 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2418 MockNonBlockWriterIO)
2419 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2420 c_io_ns = {name : getattr(io, name) for name in all_members}
2421 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2422 globs = globals()
2423 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2424 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2425 # Avoid turning open into a bound method.
2426 py_io_ns["open"] = pyio.OpenWrapper
2427 for test in tests:
2428 if test.__name__.startswith("C"):
2429 for name, obj in c_io_ns.items():
2430 setattr(test, name, obj)
2431 elif test.__name__.startswith("Py"):
2432 for name, obj in py_io_ns.items():
2433 setattr(test, name, obj)
2434
2435 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002436
2437if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 test_main()