blob: bf0d03fa4d35627bf4a8b6a2e6ed8075e9a1b34f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000031import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000039
Guido van Rossuma9e20242007-03-08 00:43:48 +000040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041def _default_chunk_size():
42 """Get the default TextIOWrapper chunk size"""
43 with open(__file__, "r", encoding="latin1") as f:
44 return f._CHUNK_SIZE
45
46
47class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000049 def __init__(self, read_stack=()):
50 self._read_stack = list(read_stack)
51 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000053
54 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000056 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000059 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
89 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
106class CMockRawIO(MockRawIO, io.RawIOBase):
107 pass
108
109class PyMockRawIO(MockRawIO, pyio.RawIOBase):
110 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111
Guido van Rossuma9e20242007-03-08 00:43:48 +0000112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113class MisbehavedRawIO(MockRawIO):
114 def write(self, b):
115 return super().write(b) * 2
116
117 def read(self, n=None):
118 return super().read(n) * 2
119
120 def seek(self, pos, whence):
121 return -123
122
123 def tell(self):
124 return -456
125
126 def readinto(self, buf):
127 super().readinto(buf)
128 return len(buf) * 5
129
130class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
131 pass
132
133class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
134 pass
135
136
137class CloseFailureIO(MockRawIO):
138 closed = 0
139
140 def close(self):
141 if not self.closed:
142 self.closed = 1
143 raise IOError
144
145class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
146 pass
147
148class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
149 pass
150
151
152class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000153
154 def __init__(self, data):
155 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000157
158 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000159 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000160 self.read_history.append(None if res is None else len(res))
161 return res
162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 def readinto(self, b):
164 res = super().readinto(b)
165 self.read_history.append(res)
166 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000168class CMockFileIO(MockFileIO, io.BytesIO):
169 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000171class PyMockFileIO(MockFileIO, pyio.BytesIO):
172 pass
173
174
175class MockNonBlockWriterIO:
176
177 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000178 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def pop_written(self):
182 s = b"".join(self._write_stack)
183 self._write_stack[:] = []
184 return s
185
186 def block_on(self, char):
187 """Block when a given char is encountered."""
188 self._blocker_char = char
189
190 def readable(self):
191 return True
192
193 def seekable(self):
194 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Guido van Rossum01a27522007-03-07 01:00:12 +0000196 def writable(self):
197 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000199 def write(self, b):
200 b = bytes(b)
201 n = -1
202 if self._blocker_char:
203 try:
204 n = b.index(self._blocker_char)
205 except ValueError:
206 pass
207 else:
208 self._blocker_char = None
209 self._write_stack.append(b[:n])
210 raise self.BlockingIOError(0, "test blocking", n)
211 self._write_stack.append(b)
212 return len(b)
213
214class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
215 BlockingIOError = io.BlockingIOError
216
217class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
218 BlockingIOError = pyio.BlockingIOError
219
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Guido van Rossum28524c72007-02-27 05:47:44 +0000221class IOTest(unittest.TestCase):
222
Neal Norwitze7789b12008-03-24 06:18:09 +0000223 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000225
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000226 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228
Guido van Rossum28524c72007-02-27 05:47:44 +0000229 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000230 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000231 f.truncate(0)
232 self.assertEqual(f.tell(), 5)
233 f.seek(0)
234
235 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000236 self.assertEqual(f.seek(0), 0)
237 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000238 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000239 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000240 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000241 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000242 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000243 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000244 self.assertEqual(f.seek(-1, 2), 13)
245 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000246
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000248 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000249 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000250
Guido van Rossum9b76da62007-04-11 01:09:03 +0000251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000254 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000267 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000271 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
278
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 LARGE = 2**31
280
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000286 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000296 self.assertEqual(f.read(2), b"x")
297
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000301 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000304 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000307 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
310
Guido van Rossum28524c72007-02-27 05:47:44 +0000311 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000322
Guido van Rossum87429772007-04-10 21:06:59 +0000323 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000334
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000335 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000344 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000348
Guido van Rossum28524c72007-02-27 05:47:44 +0000349 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000350 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000354 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000356
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000374
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000379 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000380 self.assertEqual(f.closed, True)
381 f = None
382 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000384 1/0
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
389
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 # issue 5008
391 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000392 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000393 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000394 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000399 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000400
Guido van Rossum87429772007-04-10 21:06:59 +0000401 def test_destructor(self):
402 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000403 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000404 def __del__(self):
405 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000406 try:
407 f = super().__del__
408 except AttributeError:
409 pass
410 else:
411 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000412 def close(self):
413 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000414 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000415 def flush(self):
416 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 super().flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
420 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000421 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000422 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000423 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000424 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000425
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super().__del__
441 except AttributeError:
442 pass
443 else:
444 f()
445 def close(self):
446 record.append(self.on_close)
447 super().close()
448 def flush(self):
449 record.append(self.on_flush)
450 super().flush()
451 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000452 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000453 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000454 self.assertEqual(record, [1, 2, 3])
455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
458
459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
461
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
464
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
467
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000473
Guido van Rossumd4103952007-04-12 05:44:49 +0000474 def test_array_writes(self):
475 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000476 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000481
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000482 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000484 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
495
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000499
500 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000501 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000502 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000504 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000506 self.assertEqual(file.buffer.raw.closefd, False)
507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000517 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000518 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000520
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000525 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000527 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000529 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000530 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000531 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000532 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000534 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 self.assertRaises(OverflowError, f.read)
536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537class CIOTest(IOTest):
538 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540class PyIOTest(IOTest):
541 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000542
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544class CommonBufferedTests:
545 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
546
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000547 def test_detach(self):
548 raw = self.MockRawIO()
549 buf = self.tp(raw)
550 self.assertIs(buf.detach(), raw)
551 self.assertRaises(ValueError, buf.detach)
552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 def test_fileno(self):
554 rawio = self.MockRawIO()
555 bufio = self.tp(rawio)
556
557 self.assertEquals(42, bufio.fileno())
558
559 def test_no_fileno(self):
560 # XXX will we always have fileno() function? If so, kill
561 # this test. Else, write it.
562 pass
563
564 def test_invalid_args(self):
565 rawio = self.MockRawIO()
566 bufio = self.tp(rawio)
567 # Invalid whence
568 self.assertRaises(ValueError, bufio.seek, 0, -1)
569 self.assertRaises(ValueError, bufio.seek, 0, 3)
570
571 def test_override_destructor(self):
572 tp = self.tp
573 record = []
574 class MyBufferedIO(tp):
575 def __del__(self):
576 record.append(1)
577 try:
578 f = super().__del__
579 except AttributeError:
580 pass
581 else:
582 f()
583 def close(self):
584 record.append(2)
585 super().close()
586 def flush(self):
587 record.append(3)
588 super().flush()
589 rawio = self.MockRawIO()
590 bufio = MyBufferedIO(rawio)
591 writable = bufio.writable()
592 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000593 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 if writable:
595 self.assertEqual(record, [1, 2, 3])
596 else:
597 self.assertEqual(record, [1, 2])
598
599 def test_context_manager(self):
600 # Test usability as a context manager
601 rawio = self.MockRawIO()
602 bufio = self.tp(rawio)
603 def _with():
604 with bufio:
605 pass
606 _with()
607 # bufio should now be closed, and using it a second time should raise
608 # a ValueError.
609 self.assertRaises(ValueError, _with)
610
611 def test_error_through_destructor(self):
612 # Test that the exception state is not modified by a destructor,
613 # even if close() fails.
614 rawio = self.CloseFailureIO()
615 def f():
616 self.tp(rawio).xyzzy
617 with support.captured_output("stderr") as s:
618 self.assertRaises(AttributeError, f)
619 s = s.getvalue().strip()
620 if s:
621 # The destructor *may* have printed an unraisable error, check it
622 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000623 self.assertTrue(s.startswith("Exception IOError: "), s)
624 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000625
Antoine Pitrou716c4442009-05-23 19:04:03 +0000626 def test_repr(self):
627 raw = self.MockRawIO()
628 b = self.tp(raw)
629 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
630 self.assertEqual(repr(b), "<%s>" % clsname)
631 raw.name = "dummy"
632 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
633 raw.name = b"dummy"
634 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
635
Guido van Rossum78892e42007-04-06 17:31:18 +0000636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
638 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 def test_constructor(self):
641 rawio = self.MockRawIO([b"abc"])
642 bufio = self.tp(rawio)
643 bufio.__init__(rawio)
644 bufio.__init__(rawio, buffer_size=1024)
645 bufio.__init__(rawio, buffer_size=16)
646 self.assertEquals(b"abc", bufio.read())
647 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
648 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
650 rawio = self.MockRawIO([b"abc"])
651 bufio.__init__(rawio)
652 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000655 for arg in (None, 7):
656 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
657 bufio = self.tp(rawio)
658 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659 # Invalid args
660 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662 def test_read1(self):
663 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
664 bufio = self.tp(rawio)
665 self.assertEquals(b"a", bufio.read(1))
666 self.assertEquals(b"b", bufio.read1(1))
667 self.assertEquals(rawio._reads, 1)
668 self.assertEquals(b"c", bufio.read1(100))
669 self.assertEquals(rawio._reads, 1)
670 self.assertEquals(b"d", bufio.read1(100))
671 self.assertEquals(rawio._reads, 2)
672 self.assertEquals(b"efg", bufio.read1(100))
673 self.assertEquals(rawio._reads, 3)
674 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000675 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 # Invalid args
677 self.assertRaises(ValueError, bufio.read1, -1)
678
679 def test_readinto(self):
680 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
681 bufio = self.tp(rawio)
682 b = bytearray(2)
683 self.assertEquals(bufio.readinto(b), 2)
684 self.assertEquals(b, b"ab")
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"cd")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"ef")
689 self.assertEquals(bufio.readinto(b), 1)
690 self.assertEquals(b, b"gf")
691 self.assertEquals(bufio.readinto(b), 0)
692 self.assertEquals(b, b"gf")
693
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000694 def test_readlines(self):
695 def bufio():
696 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
697 return self.tp(rawio)
698 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
699 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
700 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000703 data = b"abcdefghi"
704 dlen = len(data)
705
706 tests = [
707 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
708 [ 100, [ 3, 3, 3], [ dlen ] ],
709 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
710 ]
711
712 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000713 rawio = self.MockFileIO(data)
714 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000715 pos = 0
716 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000717 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000718 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000720 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000723 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
725 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000726
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000727 self.assertEquals(b"abcd", bufio.read(6))
728 self.assertEquals(b"e", bufio.read(1))
729 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000731 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000732 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_read_past_eof(self):
735 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
736 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000737
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000738 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 def test_read_all(self):
741 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
742 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000743
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000744 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000747 try:
748 # Write out many bytes with exactly the same number of 0's,
749 # 1's... 255's. This will help us check that concurrent reading
750 # doesn't duplicate or forget contents.
751 N = 1000
752 l = list(range(256)) * N
753 random.shuffle(l)
754 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000755 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000756 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000757 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000759 errors = []
760 results = []
761 def f():
762 try:
763 # Intra-buffer read then buffer-flushing read
764 for n in cycle([1, 19]):
765 s = bufio.read(n)
766 if not s:
767 break
768 # list.append() is atomic
769 results.append(s)
770 except Exception as e:
771 errors.append(e)
772 raise
773 threads = [threading.Thread(target=f) for x in range(20)]
774 for t in threads:
775 t.start()
776 time.sleep(0.02) # yield
777 for t in threads:
778 t.join()
779 self.assertFalse(errors,
780 "the following exceptions were caught: %r" % errors)
781 s = b''.join(results)
782 for i in range(256):
783 c = bytes(bytearray([i]))
784 self.assertEqual(s.count(c), N)
785 finally:
786 support.unlink(support.TESTFN)
787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000788 def test_misbehaved_io(self):
789 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
790 bufio = self.tp(rawio)
791 self.assertRaises(IOError, bufio.seek, 0)
792 self.assertRaises(IOError, bufio.tell)
793
794class CBufferedReaderTest(BufferedReaderTest):
795 tp = io.BufferedReader
796
797 def test_constructor(self):
798 BufferedReaderTest.test_constructor(self)
799 # The allocation can succeed on 32-bit builds, e.g. with more
800 # than 2GB RAM and a 64-bit kernel.
801 if sys.maxsize > 0x7FFFFFFF:
802 rawio = self.MockRawIO()
803 bufio = self.tp(rawio)
804 self.assertRaises((OverflowError, MemoryError, ValueError),
805 bufio.__init__, rawio, sys.maxsize)
806
807 def test_initialization(self):
808 rawio = self.MockRawIO([b"abc"])
809 bufio = self.tp(rawio)
810 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
811 self.assertRaises(ValueError, bufio.read)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
813 self.assertRaises(ValueError, bufio.read)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
815 self.assertRaises(ValueError, bufio.read)
816
817 def test_misbehaved_io_read(self):
818 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
819 bufio = self.tp(rawio)
820 # _pyio.BufferedReader seems to implement reading different, so that
821 # checking this is not so easy.
822 self.assertRaises(IOError, bufio.read, 10)
823
824 def test_garbage_collection(self):
825 # C BufferedReader objects are collected.
826 # The Python version has __del__, so it ends into gc.garbage instead
827 rawio = self.FileIO(support.TESTFN, "w+b")
828 f = self.tp(rawio)
829 f.f = f
830 wr = weakref.ref(f)
831 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000832 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000833 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834
835class PyBufferedReaderTest(BufferedReaderTest):
836 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000837
Guido van Rossuma9e20242007-03-08 00:43:48 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
840 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_constructor(self):
843 rawio = self.MockRawIO()
844 bufio = self.tp(rawio)
845 bufio.__init__(rawio)
846 bufio.__init__(rawio, buffer_size=1024)
847 bufio.__init__(rawio, buffer_size=16)
848 self.assertEquals(3, bufio.write(b"abc"))
849 bufio.flush()
850 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
853 bufio.__init__(rawio)
854 self.assertEquals(3, bufio.write(b"ghi"))
855 bufio.flush()
856 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
857
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000858 def test_detach_flush(self):
859 raw = self.MockRawIO()
860 buf = self.tp(raw)
861 buf.write(b"howdy!")
862 self.assertFalse(raw._write_stack)
863 buf.detach()
864 self.assertEqual(raw._write_stack, [b"howdy!"])
865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000867 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 writer = self.MockRawIO()
869 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000870 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 def test_write_overflow(self):
874 writer = self.MockRawIO()
875 bufio = self.tp(writer, 8)
876 contents = b"abcdefghijklmnop"
877 for n in range(0, len(contents), 3):
878 bufio.write(contents[n:n+3])
879 flushed = b"".join(writer._write_stack)
880 # At least (total - 8) bytes were implicitly flushed, perhaps more
881 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000882 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 def check_writes(self, intermediate_func):
885 # Lots of writes, test the flushed output is as expected.
886 contents = bytes(range(256)) * 1000
887 n = 0
888 writer = self.MockRawIO()
889 bufio = self.tp(writer, 13)
890 # Generator of write sizes: repeat each N 15 times then proceed to N+1
891 def gen_sizes():
892 for size in count(1):
893 for i in range(15):
894 yield size
895 sizes = gen_sizes()
896 while n < len(contents):
897 size = min(next(sizes), len(contents) - n)
898 self.assertEquals(bufio.write(contents[n:n+size]), size)
899 intermediate_func(bufio)
900 n += size
901 bufio.flush()
902 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_writes(self):
905 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 def test_writes_and_flushes(self):
908 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_writes_and_seeks(self):
911 def _seekabs(bufio):
912 pos = bufio.tell()
913 bufio.seek(pos + 1, 0)
914 bufio.seek(pos - 1, 0)
915 bufio.seek(pos, 0)
916 self.check_writes(_seekabs)
917 def _seekrel(bufio):
918 pos = bufio.seek(0, 1)
919 bufio.seek(+1, 1)
920 bufio.seek(-1, 1)
921 bufio.seek(pos, 0)
922 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_writes_and_truncates(self):
925 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 def test_write_non_blocking(self):
928 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000929 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 self.assertEquals(bufio.write(b"abcd"), 4)
932 self.assertEquals(bufio.write(b"efghi"), 5)
933 # 1 byte will be written, the rest will be buffered
934 raw.block_on(b"k")
935 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 # 8 bytes will be written, 8 will be buffered and the rest will be lost
938 raw.block_on(b"0")
939 try:
940 bufio.write(b"opqrwxyz0123456789")
941 except self.BlockingIOError as e:
942 written = e.characters_written
943 else:
944 self.fail("BlockingIOError should have been raised")
945 self.assertEquals(written, 16)
946 self.assertEquals(raw.pop_written(),
947 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
950 s = raw.pop_written()
951 # Previously buffered bytes were flushed
952 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_write_and_rewind(self):
955 raw = io.BytesIO()
956 bufio = self.tp(raw, 4)
957 self.assertEqual(bufio.write(b"abcdef"), 6)
958 self.assertEqual(bufio.tell(), 6)
959 bufio.seek(0, 0)
960 self.assertEqual(bufio.write(b"XY"), 2)
961 bufio.seek(6, 0)
962 self.assertEqual(raw.getvalue(), b"XYcdef")
963 self.assertEqual(bufio.write(b"123456"), 6)
964 bufio.flush()
965 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 def test_flush(self):
968 writer = self.MockRawIO()
969 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000970 bufio.write(b"abc")
971 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000972 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_destructor(self):
975 writer = self.MockRawIO()
976 bufio = self.tp(writer, 8)
977 bufio.write(b"abc")
978 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000979 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000980 self.assertEquals(b"abc", writer._write_stack[0])
981
982 def test_truncate(self):
983 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000984 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 bufio = self.tp(raw, 8)
986 bufio.write(b"abcdef")
987 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000988 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000989 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990 self.assertEqual(f.read(), b"abc")
991
992 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000993 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994 # Write out many bytes from many threads and test they were
995 # all flushed.
996 N = 1000
997 contents = bytes(range(256)) * N
998 sizes = cycle([1, 19])
999 n = 0
1000 queue = deque()
1001 while n < len(contents):
1002 size = next(sizes)
1003 queue.append(contents[n:n+size])
1004 n += size
1005 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001006 # We use a real file object because it allows us to
1007 # exercise situations where the GIL is released before
1008 # writing the buffer to the raw streams. This is in addition
1009 # to concurrency issues due to switching threads in the middle
1010 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001011 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001013 errors = []
1014 def f():
1015 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 while True:
1017 try:
1018 s = queue.popleft()
1019 except IndexError:
1020 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001021 bufio.write(s)
1022 except Exception as e:
1023 errors.append(e)
1024 raise
1025 threads = [threading.Thread(target=f) for x in range(20)]
1026 for t in threads:
1027 t.start()
1028 time.sleep(0.02) # yield
1029 for t in threads:
1030 t.join()
1031 self.assertFalse(errors,
1032 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001034 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 s = f.read()
1036 for i in range(256):
1037 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001038 finally:
1039 support.unlink(support.TESTFN)
1040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 def test_misbehaved_io(self):
1042 rawio = self.MisbehavedRawIO()
1043 bufio = self.tp(rawio, 5)
1044 self.assertRaises(IOError, bufio.seek, 0)
1045 self.assertRaises(IOError, bufio.tell)
1046 self.assertRaises(IOError, bufio.write, b"abcdef")
1047
Benjamin Peterson59406a92009-03-26 17:10:29 +00001048 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001049 with support.check_warnings(("max_buffer_size is deprecated",
1050 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001051 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001052
1053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054class CBufferedWriterTest(BufferedWriterTest):
1055 tp = io.BufferedWriter
1056
1057 def test_constructor(self):
1058 BufferedWriterTest.test_constructor(self)
1059 # The allocation can succeed on 32-bit builds, e.g. with more
1060 # than 2GB RAM and a 64-bit kernel.
1061 if sys.maxsize > 0x7FFFFFFF:
1062 rawio = self.MockRawIO()
1063 bufio = self.tp(rawio)
1064 self.assertRaises((OverflowError, MemoryError, ValueError),
1065 bufio.__init__, rawio, sys.maxsize)
1066
1067 def test_initialization(self):
1068 rawio = self.MockRawIO()
1069 bufio = self.tp(rawio)
1070 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1071 self.assertRaises(ValueError, bufio.write, b"def")
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1073 self.assertRaises(ValueError, bufio.write, b"def")
1074 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1075 self.assertRaises(ValueError, bufio.write, b"def")
1076
1077 def test_garbage_collection(self):
1078 # C BufferedWriter objects are collected, and collecting them flushes
1079 # all data to disk.
1080 # The Python version has __del__, so it ends into gc.garbage instead
1081 rawio = self.FileIO(support.TESTFN, "w+b")
1082 f = self.tp(rawio)
1083 f.write(b"123xxx")
1084 f.x = f
1085 wr = weakref.ref(f)
1086 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001087 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001089 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 self.assertEqual(f.read(), b"123xxx")
1091
1092
1093class PyBufferedWriterTest(BufferedWriterTest):
1094 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001095
Guido van Rossum01a27522007-03-07 01:00:12 +00001096class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001097
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001098 def test_constructor(self):
1099 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001100 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001102 def test_detach(self):
1103 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1104 self.assertRaises(self.UnsupportedOperation, pair.detach)
1105
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001106 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001107 with support.check_warnings(("max_buffer_size is deprecated",
1108 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001109 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001110
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001111 def test_constructor_with_not_readable(self):
1112 class NotReadable(MockRawIO):
1113 def readable(self):
1114 return False
1115
1116 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1117
1118 def test_constructor_with_not_writeable(self):
1119 class NotWriteable(MockRawIO):
1120 def writable(self):
1121 return False
1122
1123 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1124
1125 def test_read(self):
1126 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1127
1128 self.assertEqual(pair.read(3), b"abc")
1129 self.assertEqual(pair.read(1), b"d")
1130 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001131 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1132 self.assertEqual(pair.read(None), b"abc")
1133
1134 def test_readlines(self):
1135 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1136 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1137 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1138 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001139
1140 def test_read1(self):
1141 # .read1() is delegated to the underlying reader object, so this test
1142 # can be shallow.
1143 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1144
1145 self.assertEqual(pair.read1(3), b"abc")
1146
1147 def test_readinto(self):
1148 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1149
1150 data = bytearray(5)
1151 self.assertEqual(pair.readinto(data), 5)
1152 self.assertEqual(data, b"abcde")
1153
1154 def test_write(self):
1155 w = self.MockRawIO()
1156 pair = self.tp(self.MockRawIO(), w)
1157
1158 pair.write(b"abc")
1159 pair.flush()
1160 pair.write(b"def")
1161 pair.flush()
1162 self.assertEqual(w._write_stack, [b"abc", b"def"])
1163
1164 def test_peek(self):
1165 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1166
1167 self.assertTrue(pair.peek(3).startswith(b"abc"))
1168 self.assertEqual(pair.read(3), b"abc")
1169
1170 def test_readable(self):
1171 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1172 self.assertTrue(pair.readable())
1173
1174 def test_writeable(self):
1175 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1176 self.assertTrue(pair.writable())
1177
1178 def test_seekable(self):
1179 # BufferedRWPairs are never seekable, even if their readers and writers
1180 # are.
1181 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1182 self.assertFalse(pair.seekable())
1183
1184 # .flush() is delegated to the underlying writer object and has been
1185 # tested in the test_write method.
1186
1187 def test_close_and_closed(self):
1188 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1189 self.assertFalse(pair.closed)
1190 pair.close()
1191 self.assertTrue(pair.closed)
1192
1193 def test_isatty(self):
1194 class SelectableIsAtty(MockRawIO):
1195 def __init__(self, isatty):
1196 MockRawIO.__init__(self)
1197 self._isatty = isatty
1198
1199 def isatty(self):
1200 return self._isatty
1201
1202 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1203 self.assertFalse(pair.isatty())
1204
1205 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1206 self.assertTrue(pair.isatty())
1207
1208 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1209 self.assertTrue(pair.isatty())
1210
1211 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1212 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214class CBufferedRWPairTest(BufferedRWPairTest):
1215 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001217class PyBufferedRWPairTest(BufferedRWPairTest):
1218 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220
1221class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1222 read_mode = "rb+"
1223 write_mode = "wb+"
1224
1225 def test_constructor(self):
1226 BufferedReaderTest.test_constructor(self)
1227 BufferedWriterTest.test_constructor(self)
1228
1229 def test_read_and_write(self):
1230 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001231 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001232
1233 self.assertEqual(b"as", rw.read(2))
1234 rw.write(b"ddd")
1235 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001236 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001238 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001240 def test_seek_and_tell(self):
1241 raw = self.BytesIO(b"asdfghjkl")
1242 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001243
1244 self.assertEquals(b"as", rw.read(2))
1245 self.assertEquals(2, rw.tell())
1246 rw.seek(0, 0)
1247 self.assertEquals(b"asdf", rw.read(4))
1248
1249 rw.write(b"asdf")
1250 rw.seek(0, 0)
1251 self.assertEquals(b"asdfasdfl", rw.read())
1252 self.assertEquals(9, rw.tell())
1253 rw.seek(-4, 2)
1254 self.assertEquals(5, rw.tell())
1255 rw.seek(2, 1)
1256 self.assertEquals(7, rw.tell())
1257 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001258 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 def check_flush_and_read(self, read_func):
1261 raw = self.BytesIO(b"abcdefghi")
1262 bufio = self.tp(raw)
1263
1264 self.assertEquals(b"ab", read_func(bufio, 2))
1265 bufio.write(b"12")
1266 self.assertEquals(b"ef", read_func(bufio, 2))
1267 self.assertEquals(6, bufio.tell())
1268 bufio.flush()
1269 self.assertEquals(6, bufio.tell())
1270 self.assertEquals(b"ghi", read_func(bufio))
1271 raw.seek(0, 0)
1272 raw.write(b"XYZ")
1273 # flush() resets the read buffer
1274 bufio.flush()
1275 bufio.seek(0, 0)
1276 self.assertEquals(b"XYZ", read_func(bufio, 3))
1277
1278 def test_flush_and_read(self):
1279 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1280
1281 def test_flush_and_readinto(self):
1282 def _readinto(bufio, n=-1):
1283 b = bytearray(n if n >= 0 else 9999)
1284 n = bufio.readinto(b)
1285 return bytes(b[:n])
1286 self.check_flush_and_read(_readinto)
1287
1288 def test_flush_and_peek(self):
1289 def _peek(bufio, n=-1):
1290 # This relies on the fact that the buffer can contain the whole
1291 # raw stream, otherwise peek() can return less.
1292 b = bufio.peek(n)
1293 if n != -1:
1294 b = b[:n]
1295 bufio.seek(len(b), 1)
1296 return b
1297 self.check_flush_and_read(_peek)
1298
1299 def test_flush_and_write(self):
1300 raw = self.BytesIO(b"abcdefghi")
1301 bufio = self.tp(raw)
1302
1303 bufio.write(b"123")
1304 bufio.flush()
1305 bufio.write(b"45")
1306 bufio.flush()
1307 bufio.seek(0, 0)
1308 self.assertEquals(b"12345fghi", raw.getvalue())
1309 self.assertEquals(b"12345fghi", bufio.read())
1310
1311 def test_threads(self):
1312 BufferedReaderTest.test_threads(self)
1313 BufferedWriterTest.test_threads(self)
1314
1315 def test_writes_and_peek(self):
1316 def _peek(bufio):
1317 bufio.peek(1)
1318 self.check_writes(_peek)
1319 def _peek(bufio):
1320 pos = bufio.tell()
1321 bufio.seek(-1, 1)
1322 bufio.peek(1)
1323 bufio.seek(pos, 0)
1324 self.check_writes(_peek)
1325
1326 def test_writes_and_reads(self):
1327 def _read(bufio):
1328 bufio.seek(-1, 1)
1329 bufio.read(1)
1330 self.check_writes(_read)
1331
1332 def test_writes_and_read1s(self):
1333 def _read1(bufio):
1334 bufio.seek(-1, 1)
1335 bufio.read1(1)
1336 self.check_writes(_read1)
1337
1338 def test_writes_and_readintos(self):
1339 def _read(bufio):
1340 bufio.seek(-1, 1)
1341 bufio.readinto(bytearray(1))
1342 self.check_writes(_read)
1343
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001344 def test_write_after_readahead(self):
1345 # Issue #6629: writing after the buffer was filled by readahead should
1346 # first rewind the raw stream.
1347 for overwrite_size in [1, 5]:
1348 raw = self.BytesIO(b"A" * 10)
1349 bufio = self.tp(raw, 4)
1350 # Trigger readahead
1351 self.assertEqual(bufio.read(1), b"A")
1352 self.assertEqual(bufio.tell(), 1)
1353 # Overwriting should rewind the raw stream if it needs so
1354 bufio.write(b"B" * overwrite_size)
1355 self.assertEqual(bufio.tell(), overwrite_size + 1)
1356 # If the write size was smaller than the buffer size, flush() and
1357 # check that rewind happens.
1358 bufio.flush()
1359 self.assertEqual(bufio.tell(), overwrite_size + 1)
1360 s = raw.getvalue()
1361 self.assertEqual(s,
1362 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1363
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001364 def test_truncate_after_read_or_write(self):
1365 raw = self.BytesIO(b"A" * 10)
1366 bufio = self.tp(raw, 100)
1367 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1368 self.assertEqual(bufio.truncate(), 2)
1369 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1370 self.assertEqual(bufio.truncate(), 4)
1371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 def test_misbehaved_io(self):
1373 BufferedReaderTest.test_misbehaved_io(self)
1374 BufferedWriterTest.test_misbehaved_io(self)
1375
1376class CBufferedRandomTest(BufferedRandomTest):
1377 tp = io.BufferedRandom
1378
1379 def test_constructor(self):
1380 BufferedRandomTest.test_constructor(self)
1381 # The allocation can succeed on 32-bit builds, e.g. with more
1382 # than 2GB RAM and a 64-bit kernel.
1383 if sys.maxsize > 0x7FFFFFFF:
1384 rawio = self.MockRawIO()
1385 bufio = self.tp(rawio)
1386 self.assertRaises((OverflowError, MemoryError, ValueError),
1387 bufio.__init__, rawio, sys.maxsize)
1388
1389 def test_garbage_collection(self):
1390 CBufferedReaderTest.test_garbage_collection(self)
1391 CBufferedWriterTest.test_garbage_collection(self)
1392
1393class PyBufferedRandomTest(BufferedRandomTest):
1394 tp = pyio.BufferedRandom
1395
1396
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001397# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1398# properties:
1399# - A single output character can correspond to many bytes of input.
1400# - The number of input bytes to complete the character can be
1401# undetermined until the last input byte is received.
1402# - The number of input bytes can vary depending on previous input.
1403# - A single input byte can correspond to many characters of output.
1404# - The number of output characters can be undetermined until the
1405# last input byte is received.
1406# - The number of output characters can vary depending on previous input.
1407
1408class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1409 """
1410 For testing seek/tell behavior with a stateful, buffering decoder.
1411
1412 Input is a sequence of words. Words may be fixed-length (length set
1413 by input) or variable-length (period-terminated). In variable-length
1414 mode, extra periods are ignored. Possible words are:
1415 - 'i' followed by a number sets the input length, I (maximum 99).
1416 When I is set to 0, words are space-terminated.
1417 - 'o' followed by a number sets the output length, O (maximum 99).
1418 - Any other word is converted into a word followed by a period on
1419 the output. The output word consists of the input word truncated
1420 or padded out with hyphens to make its length equal to O. If O
1421 is 0, the word is output verbatim without truncating or padding.
1422 I and O are initially set to 1. When I changes, any buffered input is
1423 re-scanned according to the new I. EOF also terminates the last word.
1424 """
1425
1426 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001427 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001428 self.reset()
1429
1430 def __repr__(self):
1431 return '<SID %x>' % id(self)
1432
1433 def reset(self):
1434 self.i = 1
1435 self.o = 1
1436 self.buffer = bytearray()
1437
1438 def getstate(self):
1439 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1440 return bytes(self.buffer), i*100 + o
1441
1442 def setstate(self, state):
1443 buffer, io = state
1444 self.buffer = bytearray(buffer)
1445 i, o = divmod(io, 100)
1446 self.i, self.o = i ^ 1, o ^ 1
1447
1448 def decode(self, input, final=False):
1449 output = ''
1450 for b in input:
1451 if self.i == 0: # variable-length, terminated with period
1452 if b == ord('.'):
1453 if self.buffer:
1454 output += self.process_word()
1455 else:
1456 self.buffer.append(b)
1457 else: # fixed-length, terminate after self.i bytes
1458 self.buffer.append(b)
1459 if len(self.buffer) == self.i:
1460 output += self.process_word()
1461 if final and self.buffer: # EOF terminates the last word
1462 output += self.process_word()
1463 return output
1464
1465 def process_word(self):
1466 output = ''
1467 if self.buffer[0] == ord('i'):
1468 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1469 elif self.buffer[0] == ord('o'):
1470 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1471 else:
1472 output = self.buffer.decode('ascii')
1473 if len(output) < self.o:
1474 output += '-'*self.o # pad out with hyphens
1475 if self.o:
1476 output = output[:self.o] # truncate to output length
1477 output += '.'
1478 self.buffer = bytearray()
1479 return output
1480
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001481 codecEnabled = False
1482
1483 @classmethod
1484 def lookupTestDecoder(cls, name):
1485 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001486 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001487 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001488 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001489 incrementalencoder=None,
1490 streamreader=None, streamwriter=None,
1491 incrementaldecoder=cls)
1492
1493# Register the previous decoder for testing.
1494# Disabled by default, tests will enable it.
1495codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1496
1497
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001498class StatefulIncrementalDecoderTest(unittest.TestCase):
1499 """
1500 Make sure the StatefulIncrementalDecoder actually works.
1501 """
1502
1503 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001504 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001505 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001506 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001507 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001508 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001509 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001510 # I=0, O=6 (variable-length input, fixed-length output)
1511 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1512 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001513 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001514 # I=6, O=3 (fixed-length input > fixed-length output)
1515 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1516 # I=0, then 3; O=29, then 15 (with longer output)
1517 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1518 'a----------------------------.' +
1519 'b----------------------------.' +
1520 'cde--------------------------.' +
1521 'abcdefghijabcde.' +
1522 'a.b------------.' +
1523 '.c.------------.' +
1524 'd.e------------.' +
1525 'k--------------.' +
1526 'l--------------.' +
1527 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001528 ]
1529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001531 # Try a few one-shot test cases.
1532 for input, eof, output in self.test_cases:
1533 d = StatefulIncrementalDecoder()
1534 self.assertEquals(d.decode(input, eof), output)
1535
1536 # Also test an unfinished decode, followed by forcing EOF.
1537 d = StatefulIncrementalDecoder()
1538 self.assertEquals(d.decode(b'oiabcd'), '')
1539 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001540
1541class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001542
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001543 def setUp(self):
1544 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1545 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001546 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001547
Guido van Rossumd0712812007-04-11 16:32:43 +00001548 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001549 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001550
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 def test_constructor(self):
1552 r = self.BytesIO(b"\xc3\xa9\n\n")
1553 b = self.BufferedReader(r, 1000)
1554 t = self.TextIOWrapper(b)
1555 t.__init__(b, encoding="latin1", newline="\r\n")
1556 self.assertEquals(t.encoding, "latin1")
1557 self.assertEquals(t.line_buffering, False)
1558 t.__init__(b, encoding="utf8", line_buffering=True)
1559 self.assertEquals(t.encoding, "utf8")
1560 self.assertEquals(t.line_buffering, True)
1561 self.assertEquals("\xe9\n", t.readline())
1562 self.assertRaises(TypeError, t.__init__, b, newline=42)
1563 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1564
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001565 def test_detach(self):
1566 r = self.BytesIO()
1567 b = self.BufferedWriter(r)
1568 t = self.TextIOWrapper(b)
1569 self.assertIs(t.detach(), b)
1570
1571 t = self.TextIOWrapper(b, encoding="ascii")
1572 t.write("howdy")
1573 self.assertFalse(r.getvalue())
1574 t.detach()
1575 self.assertEqual(r.getvalue(), b"howdy")
1576 self.assertRaises(ValueError, t.detach)
1577
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001578 def test_repr(self):
1579 raw = self.BytesIO("hello".encode("utf-8"))
1580 b = self.BufferedReader(raw)
1581 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001582 modname = self.TextIOWrapper.__module__
1583 self.assertEqual(repr(t),
1584 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1585 raw.name = "dummy"
1586 self.assertEqual(repr(t),
1587 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1588 raw.name = b"dummy"
1589 self.assertEqual(repr(t),
1590 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 def test_line_buffering(self):
1593 r = self.BytesIO()
1594 b = self.BufferedWriter(r, 1000)
1595 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001596 t.write("X")
1597 self.assertEquals(r.getvalue(), b"") # No flush happened
1598 t.write("Y\nZ")
1599 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1600 t.write("A\rB")
1601 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 def test_encoding(self):
1604 # Check the encoding attribute is always set, and valid
1605 b = self.BytesIO()
1606 t = self.TextIOWrapper(b, encoding="utf8")
1607 self.assertEqual(t.encoding, "utf8")
1608 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001609 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 codecs.lookup(t.encoding)
1611
1612 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001613 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614 b = self.BytesIO(b"abc\n\xff\n")
1615 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001616 self.assertRaises(UnicodeError, t.read)
1617 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 b = self.BytesIO(b"abc\n\xff\n")
1619 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001620 self.assertRaises(UnicodeError, t.read)
1621 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 b = self.BytesIO(b"abc\n\xff\n")
1623 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001624 self.assertEquals(t.read(), "abc\n\n")
1625 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 b = self.BytesIO(b"abc\n\xff\n")
1627 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001628 self.assertEquals(t.read(), "abc\n\ufffd\n")
1629
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001631 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 b = self.BytesIO()
1633 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001634 self.assertRaises(UnicodeError, t.write, "\xff")
1635 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636 b = self.BytesIO()
1637 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001638 self.assertRaises(UnicodeError, t.write, "\xff")
1639 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 b = self.BytesIO()
1641 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001642 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001643 t.write("abc\xffdef\n")
1644 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001645 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001646 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 b = self.BytesIO()
1648 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001649 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001650 t.write("abc\xffdef\n")
1651 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001652 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001655 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1656
1657 tests = [
1658 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001659 [ '', input_lines ],
1660 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1661 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1662 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001663 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001664 encodings = (
1665 'utf-8', 'latin-1',
1666 'utf-16', 'utf-16-le', 'utf-16-be',
1667 'utf-32', 'utf-32-le', 'utf-32-be',
1668 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001669
Guido van Rossum8358db22007-08-18 21:39:55 +00001670 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001671 # character in TextIOWrapper._pending_line.
1672 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001673 # XXX: str.encode() should return bytes
1674 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001675 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001676 for bufsize in range(1, 10):
1677 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1679 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001680 encoding=encoding)
1681 if do_reads:
1682 got_lines = []
1683 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001684 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001685 if c2 == '':
1686 break
1687 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001688 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001689 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001690 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001691
1692 for got_line, exp_line in zip(got_lines, exp_lines):
1693 self.assertEquals(got_line, exp_line)
1694 self.assertEquals(len(got_lines), len(exp_lines))
1695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 def test_newlines_input(self):
1697 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001698 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1699 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001700 (None, normalized.decode("ascii").splitlines(True)),
1701 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001702 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1703 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1704 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001705 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 buf = self.BytesIO(testdata)
1707 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001708 self.assertEquals(txt.readlines(), expected)
1709 txt.seek(0)
1710 self.assertEquals(txt.read(), "".join(expected))
1711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_newlines_output(self):
1713 testdict = {
1714 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1715 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1716 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1717 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1718 }
1719 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1720 for newline, expected in tests:
1721 buf = self.BytesIO()
1722 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1723 txt.write("AAA\nB")
1724 txt.write("BB\nCCC\n")
1725 txt.write("X\rY\r\nZ")
1726 txt.flush()
1727 self.assertEquals(buf.closed, False)
1728 self.assertEquals(buf.getvalue(), expected)
1729
1730 def test_destructor(self):
1731 l = []
1732 base = self.BytesIO
1733 class MyBytesIO(base):
1734 def close(self):
1735 l.append(self.getvalue())
1736 base.close(self)
1737 b = MyBytesIO()
1738 t = self.TextIOWrapper(b, encoding="ascii")
1739 t.write("abc")
1740 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001741 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742 self.assertEquals([b"abc"], l)
1743
1744 def test_override_destructor(self):
1745 record = []
1746 class MyTextIO(self.TextIOWrapper):
1747 def __del__(self):
1748 record.append(1)
1749 try:
1750 f = super().__del__
1751 except AttributeError:
1752 pass
1753 else:
1754 f()
1755 def close(self):
1756 record.append(2)
1757 super().close()
1758 def flush(self):
1759 record.append(3)
1760 super().flush()
1761 b = self.BytesIO()
1762 t = MyTextIO(b, encoding="ascii")
1763 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001764 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 self.assertEqual(record, [1, 2, 3])
1766
1767 def test_error_through_destructor(self):
1768 # Test that the exception state is not modified by a destructor,
1769 # even if close() fails.
1770 rawio = self.CloseFailureIO()
1771 def f():
1772 self.TextIOWrapper(rawio).xyzzy
1773 with support.captured_output("stderr") as s:
1774 self.assertRaises(AttributeError, f)
1775 s = s.getvalue().strip()
1776 if s:
1777 # The destructor *may* have printed an unraisable error, check it
1778 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001779 self.assertTrue(s.startswith("Exception IOError: "), s)
1780 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001781
Guido van Rossum9b76da62007-04-11 01:09:03 +00001782 # Systematic tests of the text I/O API
1783
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001785 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1786 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001788 f._CHUNK_SIZE = chunksize
1789 self.assertEquals(f.write("abc"), 3)
1790 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001792 f._CHUNK_SIZE = chunksize
1793 self.assertEquals(f.tell(), 0)
1794 self.assertEquals(f.read(), "abc")
1795 cookie = f.tell()
1796 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001797 self.assertEquals(f.read(None), "abc")
1798 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001799 self.assertEquals(f.read(2), "ab")
1800 self.assertEquals(f.read(1), "c")
1801 self.assertEquals(f.read(1), "")
1802 self.assertEquals(f.read(), "")
1803 self.assertEquals(f.tell(), cookie)
1804 self.assertEquals(f.seek(0), 0)
1805 self.assertEquals(f.seek(0, 2), cookie)
1806 self.assertEquals(f.write("def"), 3)
1807 self.assertEquals(f.seek(cookie), cookie)
1808 self.assertEquals(f.read(), "def")
1809 if enc.startswith("utf"):
1810 self.multi_line_test(f, enc)
1811 f.close()
1812
1813 def multi_line_test(self, f, enc):
1814 f.seek(0)
1815 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001816 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001817 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001818 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001819 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001820 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001821 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001822 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001823 wlines.append((f.tell(), line))
1824 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001825 f.seek(0)
1826 rlines = []
1827 while True:
1828 pos = f.tell()
1829 line = f.readline()
1830 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001831 break
1832 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001833 self.assertEquals(rlines, wlines)
1834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 def test_telling(self):
1836 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001837 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001838 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001839 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001840 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001841 p2 = f.tell()
1842 f.seek(0)
1843 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001844 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001845 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001846 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001847 self.assertEquals(f.tell(), p2)
1848 f.seek(0)
1849 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001850 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001851 self.assertRaises(IOError, f.tell)
1852 self.assertEquals(f.tell(), p2)
1853 f.close()
1854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001855 def test_seeking(self):
1856 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001857 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001858 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001859 prefix = bytes(u_prefix.encode("utf-8"))
1860 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001861 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001862 suffix = bytes(u_suffix.encode("utf-8"))
1863 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001865 f.write(line*2)
1866 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001868 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001869 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001870 self.assertEquals(f.tell(), prefix_size)
1871 self.assertEquals(f.readline(), u_suffix)
1872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001874 # Regression test for a specific bug
1875 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001877 f.write(data)
1878 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001880 f._CHUNK_SIZE # Just test that it exists
1881 f._CHUNK_SIZE = 2
1882 f.readline()
1883 f.tell()
1884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 def test_seek_and_tell(self):
1886 #Test seek/tell using the StatefulIncrementalDecoder.
1887 # Make test faster by doing smaller seeks
1888 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001889
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001890 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001891 """Tell/seek to various points within a data stream and ensure
1892 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001894 f.write(data)
1895 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 f = self.open(support.TESTFN, encoding='test_decoder')
1897 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 decoded = f.read()
1899 f.close()
1900
Neal Norwitze2b07052008-03-18 19:52:05 +00001901 for i in range(min_pos, len(decoded) + 1): # seek positions
1902 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001904 self.assertEquals(f.read(i), decoded[:i])
1905 cookie = f.tell()
1906 self.assertEquals(f.read(j), decoded[i:i + j])
1907 f.seek(cookie)
1908 self.assertEquals(f.read(), decoded[i:])
1909 f.close()
1910
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001911 # Enable the test decoder.
1912 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001913
1914 # Run the tests.
1915 try:
1916 # Try each test case.
1917 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001918 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001919
1920 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001921 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1922 offset = CHUNK_SIZE - len(input)//2
1923 prefix = b'.'*offset
1924 # Don't bother seeking into the prefix (takes too long).
1925 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001926 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001927
1928 # Ensure our test decoder won't interfere with subsequent tests.
1929 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001930 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001933 data = "1234567890"
1934 tests = ("utf-16",
1935 "utf-16-le",
1936 "utf-16-be",
1937 "utf-32",
1938 "utf-32-le",
1939 "utf-32-be")
1940 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941 buf = self.BytesIO()
1942 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001943 # Check if the BOM is written only once (see issue1753).
1944 f.write(data)
1945 f.write(data)
1946 f.seek(0)
1947 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001948 f.seek(0)
1949 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001950 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1951
Benjamin Petersona1b49012009-03-31 23:11:32 +00001952 def test_unreadable(self):
1953 class UnReadable(self.BytesIO):
1954 def readable(self):
1955 return False
1956 txt = self.TextIOWrapper(UnReadable())
1957 self.assertRaises(IOError, txt.read)
1958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 def test_read_one_by_one(self):
1960 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001961 reads = ""
1962 while True:
1963 c = txt.read(1)
1964 if not c:
1965 break
1966 reads += c
1967 self.assertEquals(reads, "AA\nBB")
1968
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001969 def test_readlines(self):
1970 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1971 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1972 txt.seek(0)
1973 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1974 txt.seek(0)
1975 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1976
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001977 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001979 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001981 reads = ""
1982 while True:
1983 c = txt.read(128)
1984 if not c:
1985 break
1986 reads += c
1987 self.assertEquals(reads, "A"*127+"\nB")
1988
1989 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001991
1992 # read one char at a time
1993 reads = ""
1994 while True:
1995 c = txt.read(1)
1996 if not c:
1997 break
1998 reads += c
1999 self.assertEquals(reads, self.normalized)
2000
2001 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002003 txt._CHUNK_SIZE = 4
2004
2005 reads = ""
2006 while True:
2007 c = txt.read(4)
2008 if not c:
2009 break
2010 reads += c
2011 self.assertEquals(reads, self.normalized)
2012
2013 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002015 txt._CHUNK_SIZE = 4
2016
2017 reads = txt.read(4)
2018 reads += txt.read(4)
2019 reads += txt.readline()
2020 reads += txt.readline()
2021 reads += txt.readline()
2022 self.assertEquals(reads, self.normalized)
2023
2024 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002026 txt._CHUNK_SIZE = 4
2027
2028 reads = txt.read(4)
2029 reads += txt.read()
2030 self.assertEquals(reads, self.normalized)
2031
2032 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002034 txt._CHUNK_SIZE = 4
2035
2036 reads = txt.read(4)
2037 pos = txt.tell()
2038 txt.seek(0)
2039 txt.seek(pos)
2040 self.assertEquals(txt.read(4), "BBB\n")
2041
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002042 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 buffer = self.BytesIO(self.testdata)
2044 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002045
2046 self.assertEqual(buffer.seekable(), txt.seekable())
2047
Antoine Pitroue4501852009-05-14 18:55:55 +00002048 def test_append_bom(self):
2049 # The BOM is not written again when appending to a non-empty file
2050 filename = support.TESTFN
2051 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2052 with self.open(filename, 'w', encoding=charset) as f:
2053 f.write('aaa')
2054 pos = f.tell()
2055 with self.open(filename, 'rb') as f:
2056 self.assertEquals(f.read(), 'aaa'.encode(charset))
2057
2058 with self.open(filename, 'a', encoding=charset) as f:
2059 f.write('xxx')
2060 with self.open(filename, 'rb') as f:
2061 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2062
2063 def test_seek_bom(self):
2064 # Same test, but when seeking manually
2065 filename = support.TESTFN
2066 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2067 with self.open(filename, 'w', encoding=charset) as f:
2068 f.write('aaa')
2069 pos = f.tell()
2070 with self.open(filename, 'r+', encoding=charset) as f:
2071 f.seek(pos)
2072 f.write('zzz')
2073 f.seek(0)
2074 f.write('bbb')
2075 with self.open(filename, 'rb') as f:
2076 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2077
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002078 def test_errors_property(self):
2079 with self.open(support.TESTFN, "w") as f:
2080 self.assertEqual(f.errors, "strict")
2081 with self.open(support.TESTFN, "w", errors="replace") as f:
2082 self.assertEqual(f.errors, "replace")
2083
Antoine Pitroue4501852009-05-14 18:55:55 +00002084
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002085 def test_threads_write(self):
2086 # Issue6750: concurrent writes could duplicate data
2087 event = threading.Event()
2088 with self.open(support.TESTFN, "w", buffering=1) as f:
2089 def run(n):
2090 text = "Thread%03d\n" % n
2091 event.wait()
2092 f.write(text)
2093 threads = [threading.Thread(target=lambda n=x: run(n))
2094 for x in range(20)]
2095 for t in threads:
2096 t.start()
2097 time.sleep(0.02)
2098 event.set()
2099 for t in threads:
2100 t.join()
2101 with self.open(support.TESTFN) as f:
2102 content = f.read()
2103 for n in range(20):
2104 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002106class CTextIOWrapperTest(TextIOWrapperTest):
2107
2108 def test_initialization(self):
2109 r = self.BytesIO(b"\xc3\xa9\n\n")
2110 b = self.BufferedReader(r, 1000)
2111 t = self.TextIOWrapper(b)
2112 self.assertRaises(TypeError, t.__init__, b, newline=42)
2113 self.assertRaises(ValueError, t.read)
2114 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2115 self.assertRaises(ValueError, t.read)
2116
2117 def test_garbage_collection(self):
2118 # C TextIOWrapper objects are collected, and collecting them flushes
2119 # all data to disk.
2120 # The Python version has __del__, so it ends in gc.garbage instead.
2121 rawio = io.FileIO(support.TESTFN, "wb")
2122 b = self.BufferedWriter(rawio)
2123 t = self.TextIOWrapper(b, encoding="ascii")
2124 t.write("456def")
2125 t.x = t
2126 wr = weakref.ref(t)
2127 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002128 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002129 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002130 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 self.assertEqual(f.read(), b"456def")
2132
2133class PyTextIOWrapperTest(TextIOWrapperTest):
2134 pass
2135
2136
2137class IncrementalNewlineDecoderTest(unittest.TestCase):
2138
2139 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002140 # UTF-8 specific tests for a newline decoder
2141 def _check_decode(b, s, **kwargs):
2142 # We exercise getstate() / setstate() as well as decode()
2143 state = decoder.getstate()
2144 self.assertEquals(decoder.decode(b, **kwargs), s)
2145 decoder.setstate(state)
2146 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002147
Antoine Pitrou180a3362008-12-14 16:36:46 +00002148 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002149
Antoine Pitrou180a3362008-12-14 16:36:46 +00002150 _check_decode(b'\xe8', "")
2151 _check_decode(b'\xa2', "")
2152 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002153
Antoine Pitrou180a3362008-12-14 16:36:46 +00002154 _check_decode(b'\xe8', "")
2155 _check_decode(b'\xa2', "")
2156 _check_decode(b'\x88', "\u8888")
2157
2158 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002159 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2160
Antoine Pitrou180a3362008-12-14 16:36:46 +00002161 decoder.reset()
2162 _check_decode(b'\n', "\n")
2163 _check_decode(b'\r', "")
2164 _check_decode(b'', "\n", final=True)
2165 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002166
Antoine Pitrou180a3362008-12-14 16:36:46 +00002167 _check_decode(b'\r', "")
2168 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002169
Antoine Pitrou180a3362008-12-14 16:36:46 +00002170 _check_decode(b'\r\r\n', "\n\n")
2171 _check_decode(b'\r', "")
2172 _check_decode(b'\r', "\n")
2173 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002174
Antoine Pitrou180a3362008-12-14 16:36:46 +00002175 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2176 _check_decode(b'\xe8\xa2\x88', "\u8888")
2177 _check_decode(b'\n', "\n")
2178 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2179 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002182 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 if encoding is not None:
2184 encoder = codecs.getincrementalencoder(encoding)()
2185 def _decode_bytewise(s):
2186 # Decode one byte at a time
2187 for b in encoder.encode(s):
2188 result.append(decoder.decode(bytes([b])))
2189 else:
2190 encoder = None
2191 def _decode_bytewise(s):
2192 # Decode one char at a time
2193 for c in s:
2194 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002195 self.assertEquals(decoder.newlines, None)
2196 _decode_bytewise("abc\n\r")
2197 self.assertEquals(decoder.newlines, '\n')
2198 _decode_bytewise("\nabc")
2199 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2200 _decode_bytewise("abc\r")
2201 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2202 _decode_bytewise("abc")
2203 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2204 _decode_bytewise("abc\r")
2205 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2206 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207 input = "abc"
2208 if encoder is not None:
2209 encoder.reset()
2210 input = encoder.encode(input)
2211 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002212 self.assertEquals(decoder.newlines, None)
2213
2214 def test_newline_decoder(self):
2215 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002216 # None meaning the IncrementalNewlineDecoder takes unicode input
2217 # rather than bytes input
2218 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002219 'utf-16', 'utf-16-le', 'utf-16-be',
2220 'utf-32', 'utf-32-le', 'utf-32-be',
2221 )
2222 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002223 decoder = enc and codecs.getincrementaldecoder(enc)()
2224 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2225 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002226 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2228 self.check_newline_decoding_utf8(decoder)
2229
Antoine Pitrou66913e22009-03-06 23:40:56 +00002230 def test_newline_bytes(self):
2231 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2232 def _check(dec):
2233 self.assertEquals(dec.newlines, None)
2234 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2235 self.assertEquals(dec.newlines, None)
2236 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2237 self.assertEquals(dec.newlines, None)
2238 dec = self.IncrementalNewlineDecoder(None, translate=False)
2239 _check(dec)
2240 dec = self.IncrementalNewlineDecoder(None, translate=True)
2241 _check(dec)
2242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2244 pass
2245
2246class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2247 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002248
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002249
Guido van Rossum01a27522007-03-07 01:00:12 +00002250# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002251
Guido van Rossum5abbf752007-08-27 17:39:33 +00002252class MiscIOTest(unittest.TestCase):
2253
Barry Warsaw40e82462008-11-20 20:14:50 +00002254 def tearDown(self):
2255 support.unlink(support.TESTFN)
2256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 def test___all__(self):
2258 for name in self.io.__all__:
2259 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002260 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002261 if name == "open":
2262 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002263 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002264 self.assertTrue(issubclass(obj, Exception), name)
2265 elif not name.startswith("SEEK_"):
2266 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002267
Barry Warsaw40e82462008-11-20 20:14:50 +00002268 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002270 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002271 f.close()
2272
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002274 self.assertEquals(f.name, support.TESTFN)
2275 self.assertEquals(f.buffer.name, support.TESTFN)
2276 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2277 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002278 self.assertEquals(f.buffer.mode, "rb")
2279 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002280 f.close()
2281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002283 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002284 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2285 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002288 self.assertEquals(g.mode, "wb")
2289 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002290 self.assertEquals(g.name, f.fileno())
2291 self.assertEquals(g.raw.name, f.fileno())
2292 f.close()
2293 g.close()
2294
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002295 def test_io_after_close(self):
2296 for kwargs in [
2297 {"mode": "w"},
2298 {"mode": "wb"},
2299 {"mode": "w", "buffering": 1},
2300 {"mode": "w", "buffering": 2},
2301 {"mode": "wb", "buffering": 0},
2302 {"mode": "r"},
2303 {"mode": "rb"},
2304 {"mode": "r", "buffering": 1},
2305 {"mode": "r", "buffering": 2},
2306 {"mode": "rb", "buffering": 0},
2307 {"mode": "w+"},
2308 {"mode": "w+b"},
2309 {"mode": "w+", "buffering": 1},
2310 {"mode": "w+", "buffering": 2},
2311 {"mode": "w+b", "buffering": 0},
2312 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002313 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002314 f.close()
2315 self.assertRaises(ValueError, f.flush)
2316 self.assertRaises(ValueError, f.fileno)
2317 self.assertRaises(ValueError, f.isatty)
2318 self.assertRaises(ValueError, f.__iter__)
2319 if hasattr(f, "peek"):
2320 self.assertRaises(ValueError, f.peek, 1)
2321 self.assertRaises(ValueError, f.read)
2322 if hasattr(f, "read1"):
2323 self.assertRaises(ValueError, f.read1, 1024)
2324 if hasattr(f, "readinto"):
2325 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2326 self.assertRaises(ValueError, f.readline)
2327 self.assertRaises(ValueError, f.readlines)
2328 self.assertRaises(ValueError, f.seek, 0)
2329 self.assertRaises(ValueError, f.tell)
2330 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 self.assertRaises(ValueError, f.write,
2332 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002333 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002334 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336 def test_blockingioerror(self):
2337 # Various BlockingIOError issues
2338 self.assertRaises(TypeError, self.BlockingIOError)
2339 self.assertRaises(TypeError, self.BlockingIOError, 1)
2340 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2341 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2342 b = self.BlockingIOError(1, "")
2343 self.assertEqual(b.characters_written, 0)
2344 class C(str):
2345 pass
2346 c = C("")
2347 b = self.BlockingIOError(1, c)
2348 c.b = b
2349 b.c = c
2350 wr = weakref.ref(c)
2351 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002352 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002353 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354
2355 def test_abcs(self):
2356 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002357 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2358 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2359 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2360 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361
2362 def _check_abc_inheritance(self, abcmodule):
2363 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002364 self.assertIsInstance(f, abcmodule.IOBase)
2365 self.assertIsInstance(f, abcmodule.RawIOBase)
2366 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2367 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002369 self.assertIsInstance(f, abcmodule.IOBase)
2370 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2371 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2372 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002373 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002374 self.assertIsInstance(f, abcmodule.IOBase)
2375 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2376 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2377 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378
2379 def test_abc_inheritance(self):
2380 # Test implementations inherit from their respective ABCs
2381 self._check_abc_inheritance(self)
2382
2383 def test_abc_inheritance_official(self):
2384 # Test implementations inherit from the official ABCs of the
2385 # baseline "io" module.
2386 self._check_abc_inheritance(io)
2387
2388class CMiscIOTest(MiscIOTest):
2389 io = io
2390
2391class PyMiscIOTest(MiscIOTest):
2392 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002393
Guido van Rossum28524c72007-02-27 05:47:44 +00002394def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002395 tests = (CIOTest, PyIOTest,
2396 CBufferedReaderTest, PyBufferedReaderTest,
2397 CBufferedWriterTest, PyBufferedWriterTest,
2398 CBufferedRWPairTest, PyBufferedRWPairTest,
2399 CBufferedRandomTest, PyBufferedRandomTest,
2400 StatefulIncrementalDecoderTest,
2401 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2402 CTextIOWrapperTest, PyTextIOWrapperTest,
2403 CMiscIOTest, PyMiscIOTest,)
2404
2405 # Put the namespaces of the IO module we are testing and some useful mock
2406 # classes in the __dict__ of each test.
2407 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2408 MockNonBlockWriterIO)
2409 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2410 c_io_ns = {name : getattr(io, name) for name in all_members}
2411 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2412 globs = globals()
2413 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2414 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2415 # Avoid turning open into a bound method.
2416 py_io_ns["open"] = pyio.OpenWrapper
2417 for test in tests:
2418 if test.__name__.startswith("C"):
2419 for name, obj in c_io_ns.items():
2420 setattr(test, name, obj)
2421 elif test.__name__.startswith("Py"):
2422 for name, obj in py_io_ns.items():
2423 setattr(test, name, obj)
2424
2425 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002426
2427if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 test_main()