blob: ef3fed9dc695ef0df732d8f2c10f5d3f01718a2b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import weakref
30import gc
31import abc
32from itertools import chain, cycle, count
33from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000039
Guido van Rossuma9e20242007-03-08 00:43:48 +000040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041def _default_chunk_size():
42 """Get the default TextIOWrapper chunk size"""
43 with open(__file__, "r", encoding="latin1") as f:
44 return f._CHUNK_SIZE
45
46
47class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000049 def __init__(self, read_stack=()):
50 self._read_stack = list(read_stack)
51 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000053
54 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000056 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000059 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
89 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
106class CMockRawIO(MockRawIO, io.RawIOBase):
107 pass
108
109class PyMockRawIO(MockRawIO, pyio.RawIOBase):
110 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111
Guido van Rossuma9e20242007-03-08 00:43:48 +0000112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113class MisbehavedRawIO(MockRawIO):
114 def write(self, b):
115 return super().write(b) * 2
116
117 def read(self, n=None):
118 return super().read(n) * 2
119
120 def seek(self, pos, whence):
121 return -123
122
123 def tell(self):
124 return -456
125
126 def readinto(self, buf):
127 super().readinto(buf)
128 return len(buf) * 5
129
130class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
131 pass
132
133class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
134 pass
135
136
137class CloseFailureIO(MockRawIO):
138 closed = 0
139
140 def close(self):
141 if not self.closed:
142 self.closed = 1
143 raise IOError
144
145class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
146 pass
147
148class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
149 pass
150
151
152class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000153
154 def __init__(self, data):
155 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000157
158 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000159 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000160 self.read_history.append(None if res is None else len(res))
161 return res
162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 def readinto(self, b):
164 res = super().readinto(b)
165 self.read_history.append(res)
166 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000168class CMockFileIO(MockFileIO, io.BytesIO):
169 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000171class PyMockFileIO(MockFileIO, pyio.BytesIO):
172 pass
173
174
175class MockNonBlockWriterIO:
176
177 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000178 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def pop_written(self):
182 s = b"".join(self._write_stack)
183 self._write_stack[:] = []
184 return s
185
186 def block_on(self, char):
187 """Block when a given char is encountered."""
188 self._blocker_char = char
189
190 def readable(self):
191 return True
192
193 def seekable(self):
194 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Guido van Rossum01a27522007-03-07 01:00:12 +0000196 def writable(self):
197 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000199 def write(self, b):
200 b = bytes(b)
201 n = -1
202 if self._blocker_char:
203 try:
204 n = b.index(self._blocker_char)
205 except ValueError:
206 pass
207 else:
208 self._blocker_char = None
209 self._write_stack.append(b[:n])
210 raise self.BlockingIOError(0, "test blocking", n)
211 self._write_stack.append(b)
212 return len(b)
213
214class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
215 BlockingIOError = io.BlockingIOError
216
217class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
218 BlockingIOError = pyio.BlockingIOError
219
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Guido van Rossum28524c72007-02-27 05:47:44 +0000221class IOTest(unittest.TestCase):
222
Neal Norwitze7789b12008-03-24 06:18:09 +0000223 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000225
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000226 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228
Guido van Rossum28524c72007-02-27 05:47:44 +0000229 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000230 self.assertEqual(f.write(b"blah."), 5)
231 self.assertEqual(f.seek(0), 0)
232 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000233 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000234 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000235 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000236 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000237 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000238 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000239 self.assertEqual(f.seek(-1, 2), 13)
240 self.assertEqual(f.tell(), 13)
241 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000242 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000243 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000244
Guido van Rossum9b76da62007-04-11 01:09:03 +0000245 def read_ops(self, f, buffered=False):
246 data = f.read(5)
247 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000248 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000249 self.assertEqual(f.readinto(data), 5)
250 self.assertEqual(data, b" worl")
251 self.assertEqual(f.readinto(data), 2)
252 self.assertEqual(len(data), 5)
253 self.assertEqual(data[:2], b"d\n")
254 self.assertEqual(f.seek(0), 0)
255 self.assertEqual(f.read(20), b"hello world\n")
256 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000257 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000258 self.assertEqual(f.seek(-6, 2), 6)
259 self.assertEqual(f.read(5), b"world")
260 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000261 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000262 self.assertEqual(f.seek(-6, 1), 5)
263 self.assertEqual(f.read(5), b" worl")
264 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000265 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000266 if buffered:
267 f.seek(0)
268 self.assertEqual(f.read(), b"hello world\n")
269 f.seek(6)
270 self.assertEqual(f.read(), b"world\n")
271 self.assertEqual(f.read(), b"")
272
Guido van Rossum34d69e52007-04-10 20:08:41 +0000273 LARGE = 2**31
274
Guido van Rossum53807da2007-04-10 19:01:47 +0000275 def large_file_ops(self, f):
276 assert f.readable()
277 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000278 self.assertEqual(f.seek(self.LARGE), self.LARGE)
279 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000280 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000281 self.assertEqual(f.tell(), self.LARGE + 3)
282 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000284 self.assertEqual(f.tell(), self.LARGE + 2)
285 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000287 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000288 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
289 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000290 self.assertEqual(f.read(2), b"x")
291
Guido van Rossum28524c72007-02-27 05:47:44 +0000292 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000294 self.assertEqual(f.readable(), False)
295 self.assertEqual(f.writable(), True)
296 self.assertEqual(f.seekable(), True)
297 self.write_ops(f)
298 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000299 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.readable(), True)
301 self.assertEqual(f.writable(), False)
302 self.assertEqual(f.seekable(), True)
303 self.read_ops(f)
304 f.close()
305
Guido van Rossum87429772007-04-10 21:06:59 +0000306 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000307 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000313 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.readable(), True)
315 self.assertEqual(f.writable(), False)
316 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000318 f.close()
319
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000320 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000321 f = io.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000322 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000323 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000324 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000325 self.assertEqual(f.readline(), b"abc\n")
326 self.assertEqual(f.readline(10), b"def\n")
327 self.assertEqual(f.readline(2), b"xy")
328 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000329 self.assertEqual(f.readline(), b"foo\x00bar\n")
330 self.assertEqual(f.readline(), b"another line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000331 f.close()
332
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000334 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000335 self.write_ops(f)
336 data = f.getvalue()
337 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000338 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000339 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000340
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000342 # On Windows and Mac OSX this test comsumes large resources; It takes
343 # a long time to build the >2GB file and takes >2GB of disk space
344 # therefore the resource must be enabled to run this test.
345 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000346 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000347 print("\nTesting large file ops skipped on %s." % sys.platform,
348 file=sys.stderr)
349 print("It requires %d bytes and a long time." % self.LARGE,
350 file=sys.stderr)
351 print("Use 'regrtest.py -u largefile test_io' to run it.",
352 file=sys.stderr)
353 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000354 with self.open(support.TESTFN, "w+b", 0) as f:
355 self.large_file_ops(f)
356 with self.open(support.TESTFN, "w+b") as f:
357 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000358
359 def test_with_open(self):
360 for bufsize in (0, 1, 100):
361 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000363 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.closed, True)
365 f = None
366 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000367 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000368 1/0
369 except ZeroDivisionError:
370 self.assertEqual(f.closed, True)
371 else:
372 self.fail("1/0 didn't raise an exception")
373
Antoine Pitrou08838b62009-01-21 00:55:13 +0000374 # issue 5008
375 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000377 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000378 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000379 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000382 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000383 self.assert_(f.tell() > 0)
384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_destructor(self):
386 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000388 def __del__(self):
389 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000390 try:
391 f = super().__del__
392 except AttributeError:
393 pass
394 else:
395 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def close(self):
397 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def flush(self):
400 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 super().flush()
402 f = MyFileIO(support.TESTFN, "wb")
403 f.write(b"xxx")
404 del f
405 self.assertEqual(record, [1, 2, 3])
406 f = open(support.TESTFN, "rb")
407 self.assertEqual(f.read(), b"xxx")
408
409 def _check_base_destructor(self, base):
410 record = []
411 class MyIO(base):
412 def __init__(self):
413 # This exercises the availability of attributes on object
414 # destruction.
415 # (in the C version, close() is called by the tp_dealloc
416 # function, not by __del__)
417 self.on_del = 1
418 self.on_close = 2
419 self.on_flush = 3
420 def __del__(self):
421 record.append(self.on_del)
422 try:
423 f = super().__del__
424 except AttributeError:
425 pass
426 else:
427 f()
428 def close(self):
429 record.append(self.on_close)
430 super().close()
431 def flush(self):
432 record.append(self.on_flush)
433 super().flush()
434 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000435 del f
436 self.assertEqual(record, [1, 2, 3])
437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 def test_IOBase_destructor(self):
439 self._check_base_destructor(self.IOBase)
440
441 def test_RawIOBase_destructor(self):
442 self._check_base_destructor(self.RawIOBase)
443
444 def test_BufferedIOBase_destructor(self):
445 self._check_base_destructor(self.BufferedIOBase)
446
447 def test_TextIOBase_destructor(self):
448 self._check_base_destructor(self.TextIOBase)
449
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000452 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000453 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000455 self.assertEqual(f.read(), b"xxx")
456 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000457
Guido van Rossumd4103952007-04-12 05:44:49 +0000458 def test_array_writes(self):
459 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000460 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000462 self.assertEqual(f.write(a), n)
463 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000465 self.assertEqual(f.write(a), n)
466 f.close()
467
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000468 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000469 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000470 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 def test_read_closed(self):
473 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000474 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 with self.open(support.TESTFN, "r") as f:
476 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000477 self.assertEqual(file.read(), "egg\n")
478 file.seek(0)
479 file.close()
480 self.assertRaises(ValueError, file.read)
481
482 def test_no_closefd_with_filename(self):
483 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000485
486 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000489 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000490 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 self.assertEqual(file.buffer.raw.closefd, False)
493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000494 def test_garbage_collection(self):
495 # FileIO objects are collected, and collecting them flushes
496 # all data to disk.
497 f = self.FileIO(support.TESTFN, "wb")
498 f.write(b"abcxxx")
499 f.f = f
500 wr = weakref.ref(f)
501 del f
502 gc.collect()
503 self.assert_(wr() is None, wr)
504 with open(support.TESTFN, "rb") as f:
505 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507class CIOTest(IOTest):
508 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510class PyIOTest(IOTest):
511 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000512
Guido van Rossuma9e20242007-03-08 00:43:48 +0000513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514class CommonBufferedTests:
515 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
516
517 def test_fileno(self):
518 rawio = self.MockRawIO()
519 bufio = self.tp(rawio)
520
521 self.assertEquals(42, bufio.fileno())
522
523 def test_no_fileno(self):
524 # XXX will we always have fileno() function? If so, kill
525 # this test. Else, write it.
526 pass
527
528 def test_invalid_args(self):
529 rawio = self.MockRawIO()
530 bufio = self.tp(rawio)
531 # Invalid whence
532 self.assertRaises(ValueError, bufio.seek, 0, -1)
533 self.assertRaises(ValueError, bufio.seek, 0, 3)
534
535 def test_override_destructor(self):
536 tp = self.tp
537 record = []
538 class MyBufferedIO(tp):
539 def __del__(self):
540 record.append(1)
541 try:
542 f = super().__del__
543 except AttributeError:
544 pass
545 else:
546 f()
547 def close(self):
548 record.append(2)
549 super().close()
550 def flush(self):
551 record.append(3)
552 super().flush()
553 rawio = self.MockRawIO()
554 bufio = MyBufferedIO(rawio)
555 writable = bufio.writable()
556 del bufio
557 if writable:
558 self.assertEqual(record, [1, 2, 3])
559 else:
560 self.assertEqual(record, [1, 2])
561
562 def test_context_manager(self):
563 # Test usability as a context manager
564 rawio = self.MockRawIO()
565 bufio = self.tp(rawio)
566 def _with():
567 with bufio:
568 pass
569 _with()
570 # bufio should now be closed, and using it a second time should raise
571 # a ValueError.
572 self.assertRaises(ValueError, _with)
573
574 def test_error_through_destructor(self):
575 # Test that the exception state is not modified by a destructor,
576 # even if close() fails.
577 rawio = self.CloseFailureIO()
578 def f():
579 self.tp(rawio).xyzzy
580 with support.captured_output("stderr") as s:
581 self.assertRaises(AttributeError, f)
582 s = s.getvalue().strip()
583 if s:
584 # The destructor *may* have printed an unraisable error, check it
585 self.assertEqual(len(s.splitlines()), 1)
586 self.assert_(s.startswith("Exception IOError: "), s)
587 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000588
589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
591 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000593 def test_constructor(self):
594 rawio = self.MockRawIO([b"abc"])
595 bufio = self.tp(rawio)
596 bufio.__init__(rawio)
597 bufio.__init__(rawio, buffer_size=1024)
598 bufio.__init__(rawio, buffer_size=16)
599 self.assertEquals(b"abc", bufio.read())
600 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
601 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
602 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
603 rawio = self.MockRawIO([b"abc"])
604 bufio.__init__(rawio)
605 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 def test_read(self):
608 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
609 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000610 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 # Invalid args
612 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 def test_read1(self):
615 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
616 bufio = self.tp(rawio)
617 self.assertEquals(b"a", bufio.read(1))
618 self.assertEquals(b"b", bufio.read1(1))
619 self.assertEquals(rawio._reads, 1)
620 self.assertEquals(b"c", bufio.read1(100))
621 self.assertEquals(rawio._reads, 1)
622 self.assertEquals(b"d", bufio.read1(100))
623 self.assertEquals(rawio._reads, 2)
624 self.assertEquals(b"efg", bufio.read1(100))
625 self.assertEquals(rawio._reads, 3)
626 self.assertEquals(b"", bufio.read1(100))
627 # Invalid args
628 self.assertRaises(ValueError, bufio.read1, -1)
629
630 def test_readinto(self):
631 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
632 bufio = self.tp(rawio)
633 b = bytearray(2)
634 self.assertEquals(bufio.readinto(b), 2)
635 self.assertEquals(b, b"ab")
636 self.assertEquals(bufio.readinto(b), 2)
637 self.assertEquals(b, b"cd")
638 self.assertEquals(bufio.readinto(b), 2)
639 self.assertEquals(b, b"ef")
640 self.assertEquals(bufio.readinto(b), 1)
641 self.assertEquals(b, b"gf")
642 self.assertEquals(bufio.readinto(b), 0)
643 self.assertEquals(b, b"gf")
644
645 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000646 data = b"abcdefghi"
647 dlen = len(data)
648
649 tests = [
650 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
651 [ 100, [ 3, 3, 3], [ dlen ] ],
652 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
653 ]
654
655 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 rawio = self.MockFileIO(data)
657 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000658 pos = 0
659 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000660 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000661 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000663 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000666 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
668 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000669
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000670 self.assertEquals(b"abcd", bufio.read(6))
671 self.assertEquals(b"e", bufio.read(1))
672 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000673 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000674 self.assert_(None is bufio.read())
675 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_read_past_eof(self):
678 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
679 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000680
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000681 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 def test_read_all(self):
684 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
685 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000686
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000687 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000690 try:
691 # Write out many bytes with exactly the same number of 0's,
692 # 1's... 255's. This will help us check that concurrent reading
693 # doesn't duplicate or forget contents.
694 N = 1000
695 l = list(range(256)) * N
696 random.shuffle(l)
697 s = bytes(bytearray(l))
698 with io.open(support.TESTFN, "wb") as f:
699 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
701 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000702 errors = []
703 results = []
704 def f():
705 try:
706 # Intra-buffer read then buffer-flushing read
707 for n in cycle([1, 19]):
708 s = bufio.read(n)
709 if not s:
710 break
711 # list.append() is atomic
712 results.append(s)
713 except Exception as e:
714 errors.append(e)
715 raise
716 threads = [threading.Thread(target=f) for x in range(20)]
717 for t in threads:
718 t.start()
719 time.sleep(0.02) # yield
720 for t in threads:
721 t.join()
722 self.assertFalse(errors,
723 "the following exceptions were caught: %r" % errors)
724 s = b''.join(results)
725 for i in range(256):
726 c = bytes(bytearray([i]))
727 self.assertEqual(s.count(c), N)
728 finally:
729 support.unlink(support.TESTFN)
730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_misbehaved_io(self):
732 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
733 bufio = self.tp(rawio)
734 self.assertRaises(IOError, bufio.seek, 0)
735 self.assertRaises(IOError, bufio.tell)
736
737class CBufferedReaderTest(BufferedReaderTest):
738 tp = io.BufferedReader
739
740 def test_constructor(self):
741 BufferedReaderTest.test_constructor(self)
742 # The allocation can succeed on 32-bit builds, e.g. with more
743 # than 2GB RAM and a 64-bit kernel.
744 if sys.maxsize > 0x7FFFFFFF:
745 rawio = self.MockRawIO()
746 bufio = self.tp(rawio)
747 self.assertRaises((OverflowError, MemoryError, ValueError),
748 bufio.__init__, rawio, sys.maxsize)
749
750 def test_initialization(self):
751 rawio = self.MockRawIO([b"abc"])
752 bufio = self.tp(rawio)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
754 self.assertRaises(ValueError, bufio.read)
755 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
756 self.assertRaises(ValueError, bufio.read)
757 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
758 self.assertRaises(ValueError, bufio.read)
759
760 def test_misbehaved_io_read(self):
761 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
762 bufio = self.tp(rawio)
763 # _pyio.BufferedReader seems to implement reading different, so that
764 # checking this is not so easy.
765 self.assertRaises(IOError, bufio.read, 10)
766
767 def test_garbage_collection(self):
768 # C BufferedReader objects are collected.
769 # The Python version has __del__, so it ends into gc.garbage instead
770 rawio = self.FileIO(support.TESTFN, "w+b")
771 f = self.tp(rawio)
772 f.f = f
773 wr = weakref.ref(f)
774 del f
775 gc.collect()
776 self.assert_(wr() is None, wr)
777
778class PyBufferedReaderTest(BufferedReaderTest):
779 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000780
Guido van Rossuma9e20242007-03-08 00:43:48 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
783 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000784
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000785 def test_constructor(self):
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio)
788 bufio.__init__(rawio)
789 bufio.__init__(rawio, buffer_size=1024)
790 bufio.__init__(rawio, buffer_size=16)
791 self.assertEquals(3, bufio.write(b"abc"))
792 bufio.flush()
793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796 bufio.__init__(rawio)
797 self.assertEquals(3, bufio.write(b"ghi"))
798 bufio.flush()
799 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
800
801 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000802 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803 writer = self.MockRawIO()
804 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000805 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000806 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 def test_write_overflow(self):
809 writer = self.MockRawIO()
810 bufio = self.tp(writer, 8)
811 contents = b"abcdefghijklmnop"
812 for n in range(0, len(contents), 3):
813 bufio.write(contents[n:n+3])
814 flushed = b"".join(writer._write_stack)
815 # At least (total - 8) bytes were implicitly flushed, perhaps more
816 # depending on the implementation.
817 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 def check_writes(self, intermediate_func):
820 # Lots of writes, test the flushed output is as expected.
821 contents = bytes(range(256)) * 1000
822 n = 0
823 writer = self.MockRawIO()
824 bufio = self.tp(writer, 13)
825 # Generator of write sizes: repeat each N 15 times then proceed to N+1
826 def gen_sizes():
827 for size in count(1):
828 for i in range(15):
829 yield size
830 sizes = gen_sizes()
831 while n < len(contents):
832 size = min(next(sizes), len(contents) - n)
833 self.assertEquals(bufio.write(contents[n:n+size]), size)
834 intermediate_func(bufio)
835 n += size
836 bufio.flush()
837 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839 def test_writes(self):
840 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_writes_and_flushes(self):
843 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_writes_and_seeks(self):
846 def _seekabs(bufio):
847 pos = bufio.tell()
848 bufio.seek(pos + 1, 0)
849 bufio.seek(pos - 1, 0)
850 bufio.seek(pos, 0)
851 self.check_writes(_seekabs)
852 def _seekrel(bufio):
853 pos = bufio.seek(0, 1)
854 bufio.seek(+1, 1)
855 bufio.seek(-1, 1)
856 bufio.seek(pos, 0)
857 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_writes_and_truncates(self):
860 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_write_non_blocking(self):
863 raw = self.MockNonBlockWriterIO()
864 bufio = self.tp(raw, 8, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 self.assertEquals(bufio.write(b"abcd"), 4)
867 self.assertEquals(bufio.write(b"efghi"), 5)
868 # 1 byte will be written, the rest will be buffered
869 raw.block_on(b"k")
870 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 # 8 bytes will be written, 8 will be buffered and the rest will be lost
873 raw.block_on(b"0")
874 try:
875 bufio.write(b"opqrwxyz0123456789")
876 except self.BlockingIOError as e:
877 written = e.characters_written
878 else:
879 self.fail("BlockingIOError should have been raised")
880 self.assertEquals(written, 16)
881 self.assertEquals(raw.pop_written(),
882 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
885 s = raw.pop_written()
886 # Previously buffered bytes were flushed
887 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_write_and_rewind(self):
890 raw = io.BytesIO()
891 bufio = self.tp(raw, 4)
892 self.assertEqual(bufio.write(b"abcdef"), 6)
893 self.assertEqual(bufio.tell(), 6)
894 bufio.seek(0, 0)
895 self.assertEqual(bufio.write(b"XY"), 2)
896 bufio.seek(6, 0)
897 self.assertEqual(raw.getvalue(), b"XYcdef")
898 self.assertEqual(bufio.write(b"123456"), 6)
899 bufio.flush()
900 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_flush(self):
903 writer = self.MockRawIO()
904 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000905 bufio.write(b"abc")
906 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000907 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_destructor(self):
910 writer = self.MockRawIO()
911 bufio = self.tp(writer, 8)
912 bufio.write(b"abc")
913 del bufio
914 self.assertEquals(b"abc", writer._write_stack[0])
915
916 def test_truncate(self):
917 # Truncate implicitly flushes the buffer.
918 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
919 bufio = self.tp(raw, 8)
920 bufio.write(b"abcdef")
921 self.assertEqual(bufio.truncate(3), 3)
922 self.assertEqual(bufio.tell(), 3)
923 with io.open(support.TESTFN, "rb", buffering=0) as f:
924 self.assertEqual(f.read(), b"abc")
925
926 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000927 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 # Write out many bytes from many threads and test they were
929 # all flushed.
930 N = 1000
931 contents = bytes(range(256)) * N
932 sizes = cycle([1, 19])
933 n = 0
934 queue = deque()
935 while n < len(contents):
936 size = next(sizes)
937 queue.append(contents[n:n+size])
938 n += size
939 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000940 # We use a real file object because it allows us to
941 # exercise situations where the GIL is released before
942 # writing the buffer to the raw streams. This is in addition
943 # to concurrency issues due to switching threads in the middle
944 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
946 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000947 errors = []
948 def f():
949 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 while True:
951 try:
952 s = queue.popleft()
953 except IndexError:
954 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000955 bufio.write(s)
956 except Exception as e:
957 errors.append(e)
958 raise
959 threads = [threading.Thread(target=f) for x in range(20)]
960 for t in threads:
961 t.start()
962 time.sleep(0.02) # yield
963 for t in threads:
964 t.join()
965 self.assertFalse(errors,
966 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 bufio.close()
968 with io.open(support.TESTFN, "rb") as f:
969 s = f.read()
970 for i in range(256):
971 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 finally:
973 support.unlink(support.TESTFN)
974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def test_misbehaved_io(self):
976 rawio = self.MisbehavedRawIO()
977 bufio = self.tp(rawio, 5)
978 self.assertRaises(IOError, bufio.seek, 0)
979 self.assertRaises(IOError, bufio.tell)
980 self.assertRaises(IOError, bufio.write, b"abcdef")
981
982class CBufferedWriterTest(BufferedWriterTest):
983 tp = io.BufferedWriter
984
985 def test_constructor(self):
986 BufferedWriterTest.test_constructor(self)
987 # The allocation can succeed on 32-bit builds, e.g. with more
988 # than 2GB RAM and a 64-bit kernel.
989 if sys.maxsize > 0x7FFFFFFF:
990 rawio = self.MockRawIO()
991 bufio = self.tp(rawio)
992 self.assertRaises((OverflowError, MemoryError, ValueError),
993 bufio.__init__, rawio, sys.maxsize)
994
995 def test_initialization(self):
996 rawio = self.MockRawIO()
997 bufio = self.tp(rawio)
998 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
999 self.assertRaises(ValueError, bufio.write, b"def")
1000 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1001 self.assertRaises(ValueError, bufio.write, b"def")
1002 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1003 self.assertRaises(ValueError, bufio.write, b"def")
1004
1005 def test_garbage_collection(self):
1006 # C BufferedWriter objects are collected, and collecting them flushes
1007 # all data to disk.
1008 # The Python version has __del__, so it ends into gc.garbage instead
1009 rawio = self.FileIO(support.TESTFN, "w+b")
1010 f = self.tp(rawio)
1011 f.write(b"123xxx")
1012 f.x = f
1013 wr = weakref.ref(f)
1014 del f
1015 gc.collect()
1016 self.assert_(wr() is None, wr)
1017 with open(support.TESTFN, "rb") as f:
1018 self.assertEqual(f.read(), b"123xxx")
1019
1020
1021class PyBufferedWriterTest(BufferedWriterTest):
1022 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001023
Guido van Rossum01a27522007-03-07 01:00:12 +00001024class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_basic(self):
1027 r = self.MockRawIO(())
1028 w = self.MockRawIO()
1029 pair = self.tp(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +00001030 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001031
Benjamin Peterson92035012008-12-27 16:00:54 +00001032 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +00001033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034class CBufferedRWPairTest(BufferedRWPairTest):
1035 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037class PyBufferedRWPairTest(BufferedRWPairTest):
1038 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040
1041class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1042 read_mode = "rb+"
1043 write_mode = "wb+"
1044
1045 def test_constructor(self):
1046 BufferedReaderTest.test_constructor(self)
1047 BufferedWriterTest.test_constructor(self)
1048
1049 def test_read_and_write(self):
1050 raw = self.MockRawIO((b"asdf", b"ghjk"))
1051 rw = self.tp(raw, 8, 12)
Guido van Rossum01a27522007-03-07 01:00:12 +00001052
1053 self.assertEqual(b"as", rw.read(2))
1054 rw.write(b"ddd")
1055 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001056 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001058 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_seek_and_tell(self):
1061 raw = self.BytesIO(b"asdfghjkl")
1062 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001063
1064 self.assertEquals(b"as", rw.read(2))
1065 self.assertEquals(2, rw.tell())
1066 rw.seek(0, 0)
1067 self.assertEquals(b"asdf", rw.read(4))
1068
1069 rw.write(b"asdf")
1070 rw.seek(0, 0)
1071 self.assertEquals(b"asdfasdfl", rw.read())
1072 self.assertEquals(9, rw.tell())
1073 rw.seek(-4, 2)
1074 self.assertEquals(5, rw.tell())
1075 rw.seek(2, 1)
1076 self.assertEquals(7, rw.tell())
1077 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001078 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def check_flush_and_read(self, read_func):
1081 raw = self.BytesIO(b"abcdefghi")
1082 bufio = self.tp(raw)
1083
1084 self.assertEquals(b"ab", read_func(bufio, 2))
1085 bufio.write(b"12")
1086 self.assertEquals(b"ef", read_func(bufio, 2))
1087 self.assertEquals(6, bufio.tell())
1088 bufio.flush()
1089 self.assertEquals(6, bufio.tell())
1090 self.assertEquals(b"ghi", read_func(bufio))
1091 raw.seek(0, 0)
1092 raw.write(b"XYZ")
1093 # flush() resets the read buffer
1094 bufio.flush()
1095 bufio.seek(0, 0)
1096 self.assertEquals(b"XYZ", read_func(bufio, 3))
1097
1098 def test_flush_and_read(self):
1099 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1100
1101 def test_flush_and_readinto(self):
1102 def _readinto(bufio, n=-1):
1103 b = bytearray(n if n >= 0 else 9999)
1104 n = bufio.readinto(b)
1105 return bytes(b[:n])
1106 self.check_flush_and_read(_readinto)
1107
1108 def test_flush_and_peek(self):
1109 def _peek(bufio, n=-1):
1110 # This relies on the fact that the buffer can contain the whole
1111 # raw stream, otherwise peek() can return less.
1112 b = bufio.peek(n)
1113 if n != -1:
1114 b = b[:n]
1115 bufio.seek(len(b), 1)
1116 return b
1117 self.check_flush_and_read(_peek)
1118
1119 def test_flush_and_write(self):
1120 raw = self.BytesIO(b"abcdefghi")
1121 bufio = self.tp(raw)
1122
1123 bufio.write(b"123")
1124 bufio.flush()
1125 bufio.write(b"45")
1126 bufio.flush()
1127 bufio.seek(0, 0)
1128 self.assertEquals(b"12345fghi", raw.getvalue())
1129 self.assertEquals(b"12345fghi", bufio.read())
1130
1131 def test_threads(self):
1132 BufferedReaderTest.test_threads(self)
1133 BufferedWriterTest.test_threads(self)
1134
1135 def test_writes_and_peek(self):
1136 def _peek(bufio):
1137 bufio.peek(1)
1138 self.check_writes(_peek)
1139 def _peek(bufio):
1140 pos = bufio.tell()
1141 bufio.seek(-1, 1)
1142 bufio.peek(1)
1143 bufio.seek(pos, 0)
1144 self.check_writes(_peek)
1145
1146 def test_writes_and_reads(self):
1147 def _read(bufio):
1148 bufio.seek(-1, 1)
1149 bufio.read(1)
1150 self.check_writes(_read)
1151
1152 def test_writes_and_read1s(self):
1153 def _read1(bufio):
1154 bufio.seek(-1, 1)
1155 bufio.read1(1)
1156 self.check_writes(_read1)
1157
1158 def test_writes_and_readintos(self):
1159 def _read(bufio):
1160 bufio.seek(-1, 1)
1161 bufio.readinto(bytearray(1))
1162 self.check_writes(_read)
1163
1164 def test_misbehaved_io(self):
1165 BufferedReaderTest.test_misbehaved_io(self)
1166 BufferedWriterTest.test_misbehaved_io(self)
1167
1168class CBufferedRandomTest(BufferedRandomTest):
1169 tp = io.BufferedRandom
1170
1171 def test_constructor(self):
1172 BufferedRandomTest.test_constructor(self)
1173 # The allocation can succeed on 32-bit builds, e.g. with more
1174 # than 2GB RAM and a 64-bit kernel.
1175 if sys.maxsize > 0x7FFFFFFF:
1176 rawio = self.MockRawIO()
1177 bufio = self.tp(rawio)
1178 self.assertRaises((OverflowError, MemoryError, ValueError),
1179 bufio.__init__, rawio, sys.maxsize)
1180
1181 def test_garbage_collection(self):
1182 CBufferedReaderTest.test_garbage_collection(self)
1183 CBufferedWriterTest.test_garbage_collection(self)
1184
1185class PyBufferedRandomTest(BufferedRandomTest):
1186 tp = pyio.BufferedRandom
1187
1188
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001189# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1190# properties:
1191# - A single output character can correspond to many bytes of input.
1192# - The number of input bytes to complete the character can be
1193# undetermined until the last input byte is received.
1194# - The number of input bytes can vary depending on previous input.
1195# - A single input byte can correspond to many characters of output.
1196# - The number of output characters can be undetermined until the
1197# last input byte is received.
1198# - The number of output characters can vary depending on previous input.
1199
1200class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1201 """
1202 For testing seek/tell behavior with a stateful, buffering decoder.
1203
1204 Input is a sequence of words. Words may be fixed-length (length set
1205 by input) or variable-length (period-terminated). In variable-length
1206 mode, extra periods are ignored. Possible words are:
1207 - 'i' followed by a number sets the input length, I (maximum 99).
1208 When I is set to 0, words are space-terminated.
1209 - 'o' followed by a number sets the output length, O (maximum 99).
1210 - Any other word is converted into a word followed by a period on
1211 the output. The output word consists of the input word truncated
1212 or padded out with hyphens to make its length equal to O. If O
1213 is 0, the word is output verbatim without truncating or padding.
1214 I and O are initially set to 1. When I changes, any buffered input is
1215 re-scanned according to the new I. EOF also terminates the last word.
1216 """
1217
1218 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001219 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001220 self.reset()
1221
1222 def __repr__(self):
1223 return '<SID %x>' % id(self)
1224
1225 def reset(self):
1226 self.i = 1
1227 self.o = 1
1228 self.buffer = bytearray()
1229
1230 def getstate(self):
1231 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1232 return bytes(self.buffer), i*100 + o
1233
1234 def setstate(self, state):
1235 buffer, io = state
1236 self.buffer = bytearray(buffer)
1237 i, o = divmod(io, 100)
1238 self.i, self.o = i ^ 1, o ^ 1
1239
1240 def decode(self, input, final=False):
1241 output = ''
1242 for b in input:
1243 if self.i == 0: # variable-length, terminated with period
1244 if b == ord('.'):
1245 if self.buffer:
1246 output += self.process_word()
1247 else:
1248 self.buffer.append(b)
1249 else: # fixed-length, terminate after self.i bytes
1250 self.buffer.append(b)
1251 if len(self.buffer) == self.i:
1252 output += self.process_word()
1253 if final and self.buffer: # EOF terminates the last word
1254 output += self.process_word()
1255 return output
1256
1257 def process_word(self):
1258 output = ''
1259 if self.buffer[0] == ord('i'):
1260 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1261 elif self.buffer[0] == ord('o'):
1262 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1263 else:
1264 output = self.buffer.decode('ascii')
1265 if len(output) < self.o:
1266 output += '-'*self.o # pad out with hyphens
1267 if self.o:
1268 output = output[:self.o] # truncate to output length
1269 output += '.'
1270 self.buffer = bytearray()
1271 return output
1272
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001273 codecEnabled = False
1274
1275 @classmethod
1276 def lookupTestDecoder(cls, name):
1277 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001278 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001279 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001280 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001281 incrementalencoder=None,
1282 streamreader=None, streamwriter=None,
1283 incrementaldecoder=cls)
1284
1285# Register the previous decoder for testing.
1286# Disabled by default, tests will enable it.
1287codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1288
1289
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001290class StatefulIncrementalDecoderTest(unittest.TestCase):
1291 """
1292 Make sure the StatefulIncrementalDecoder actually works.
1293 """
1294
1295 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001296 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001297 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001298 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001299 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001300 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001301 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001302 # I=0, O=6 (variable-length input, fixed-length output)
1303 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1304 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001305 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001306 # I=6, O=3 (fixed-length input > fixed-length output)
1307 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1308 # I=0, then 3; O=29, then 15 (with longer output)
1309 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1310 'a----------------------------.' +
1311 'b----------------------------.' +
1312 'cde--------------------------.' +
1313 'abcdefghijabcde.' +
1314 'a.b------------.' +
1315 '.c.------------.' +
1316 'd.e------------.' +
1317 'k--------------.' +
1318 'l--------------.' +
1319 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001320 ]
1321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001323 # Try a few one-shot test cases.
1324 for input, eof, output in self.test_cases:
1325 d = StatefulIncrementalDecoder()
1326 self.assertEquals(d.decode(input, eof), output)
1327
1328 # Also test an unfinished decode, followed by forcing EOF.
1329 d = StatefulIncrementalDecoder()
1330 self.assertEquals(d.decode(b'oiabcd'), '')
1331 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001332
1333class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001334
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001335 def setUp(self):
1336 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1337 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001338 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001339
Guido van Rossumd0712812007-04-11 16:32:43 +00001340 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001341 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343 def test_constructor(self):
1344 r = self.BytesIO(b"\xc3\xa9\n\n")
1345 b = self.BufferedReader(r, 1000)
1346 t = self.TextIOWrapper(b)
1347 t.__init__(b, encoding="latin1", newline="\r\n")
1348 self.assertEquals(t.encoding, "latin1")
1349 self.assertEquals(t.line_buffering, False)
1350 t.__init__(b, encoding="utf8", line_buffering=True)
1351 self.assertEquals(t.encoding, "utf8")
1352 self.assertEquals(t.line_buffering, True)
1353 self.assertEquals("\xe9\n", t.readline())
1354 self.assertRaises(TypeError, t.__init__, b, newline=42)
1355 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1356
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001357 def test_repr(self):
1358 raw = self.BytesIO("hello".encode("utf-8"))
1359 b = self.BufferedReader(raw)
1360 t = self.TextIOWrapper(b, encoding="utf-8")
1361 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 def test_line_buffering(self):
1364 r = self.BytesIO()
1365 b = self.BufferedWriter(r, 1000)
1366 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001367 t.write("X")
1368 self.assertEquals(r.getvalue(), b"") # No flush happened
1369 t.write("Y\nZ")
1370 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1371 t.write("A\rB")
1372 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001374 def test_encoding(self):
1375 # Check the encoding attribute is always set, and valid
1376 b = self.BytesIO()
1377 t = self.TextIOWrapper(b, encoding="utf8")
1378 self.assertEqual(t.encoding, "utf8")
1379 t = self.TextIOWrapper(b)
1380 self.assert_(t.encoding is not None)
1381 codecs.lookup(t.encoding)
1382
1383 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001384 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 b = self.BytesIO(b"abc\n\xff\n")
1386 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001387 self.assertRaises(UnicodeError, t.read)
1388 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 b = self.BytesIO(b"abc\n\xff\n")
1390 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001391 self.assertRaises(UnicodeError, t.read)
1392 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001393 b = self.BytesIO(b"abc\n\xff\n")
1394 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001395 self.assertEquals(t.read(), "abc\n\n")
1396 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 b = self.BytesIO(b"abc\n\xff\n")
1398 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001399 self.assertEquals(t.read(), "abc\n\ufffd\n")
1400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001402 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 b = self.BytesIO()
1404 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001405 self.assertRaises(UnicodeError, t.write, "\xff")
1406 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 b = self.BytesIO()
1408 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001409 self.assertRaises(UnicodeError, t.write, "\xff")
1410 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 b = self.BytesIO()
1412 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001413 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001414 t.write("abc\xffdef\n")
1415 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001416 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001417 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001418 b = self.BytesIO()
1419 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001420 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001421 t.write("abc\xffdef\n")
1422 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001423 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001426 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1427
1428 tests = [
1429 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001430 [ '', input_lines ],
1431 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1432 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1433 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001434 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001435 encodings = (
1436 'utf-8', 'latin-1',
1437 'utf-16', 'utf-16-le', 'utf-16-be',
1438 'utf-32', 'utf-32-le', 'utf-32-be',
1439 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001440
Guido van Rossum8358db22007-08-18 21:39:55 +00001441 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001442 # character in TextIOWrapper._pending_line.
1443 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001444 # XXX: str.encode() should return bytes
1445 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001446 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001447 for bufsize in range(1, 10):
1448 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1450 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001451 encoding=encoding)
1452 if do_reads:
1453 got_lines = []
1454 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001455 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001456 if c2 == '':
1457 break
1458 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001459 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001460 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001461 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001462
1463 for got_line, exp_line in zip(got_lines, exp_lines):
1464 self.assertEquals(got_line, exp_line)
1465 self.assertEquals(len(got_lines), len(exp_lines))
1466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467 def test_newlines_input(self):
1468 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001469 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1470 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001471 (None, normalized.decode("ascii").splitlines(True)),
1472 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001473 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1474 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1475 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001476 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 buf = self.BytesIO(testdata)
1478 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001479 self.assertEquals(txt.readlines(), expected)
1480 txt.seek(0)
1481 self.assertEquals(txt.read(), "".join(expected))
1482
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001483 def test_newlines_output(self):
1484 testdict = {
1485 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1486 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1487 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1488 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1489 }
1490 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1491 for newline, expected in tests:
1492 buf = self.BytesIO()
1493 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1494 txt.write("AAA\nB")
1495 txt.write("BB\nCCC\n")
1496 txt.write("X\rY\r\nZ")
1497 txt.flush()
1498 self.assertEquals(buf.closed, False)
1499 self.assertEquals(buf.getvalue(), expected)
1500
1501 def test_destructor(self):
1502 l = []
1503 base = self.BytesIO
1504 class MyBytesIO(base):
1505 def close(self):
1506 l.append(self.getvalue())
1507 base.close(self)
1508 b = MyBytesIO()
1509 t = self.TextIOWrapper(b, encoding="ascii")
1510 t.write("abc")
1511 del t
1512 self.assertEquals([b"abc"], l)
1513
1514 def test_override_destructor(self):
1515 record = []
1516 class MyTextIO(self.TextIOWrapper):
1517 def __del__(self):
1518 record.append(1)
1519 try:
1520 f = super().__del__
1521 except AttributeError:
1522 pass
1523 else:
1524 f()
1525 def close(self):
1526 record.append(2)
1527 super().close()
1528 def flush(self):
1529 record.append(3)
1530 super().flush()
1531 b = self.BytesIO()
1532 t = MyTextIO(b, encoding="ascii")
1533 del t
1534 self.assertEqual(record, [1, 2, 3])
1535
1536 def test_error_through_destructor(self):
1537 # Test that the exception state is not modified by a destructor,
1538 # even if close() fails.
1539 rawio = self.CloseFailureIO()
1540 def f():
1541 self.TextIOWrapper(rawio).xyzzy
1542 with support.captured_output("stderr") as s:
1543 self.assertRaises(AttributeError, f)
1544 s = s.getvalue().strip()
1545 if s:
1546 # The destructor *may* have printed an unraisable error, check it
1547 self.assertEqual(len(s.splitlines()), 1)
1548 self.assert_(s.startswith("Exception IOError: "), s)
1549 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001550
Guido van Rossum9b76da62007-04-11 01:09:03 +00001551 # Systematic tests of the text I/O API
1552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001554 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1555 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001557 f._CHUNK_SIZE = chunksize
1558 self.assertEquals(f.write("abc"), 3)
1559 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001561 f._CHUNK_SIZE = chunksize
1562 self.assertEquals(f.tell(), 0)
1563 self.assertEquals(f.read(), "abc")
1564 cookie = f.tell()
1565 self.assertEquals(f.seek(0), 0)
1566 self.assertEquals(f.read(2), "ab")
1567 self.assertEquals(f.read(1), "c")
1568 self.assertEquals(f.read(1), "")
1569 self.assertEquals(f.read(), "")
1570 self.assertEquals(f.tell(), cookie)
1571 self.assertEquals(f.seek(0), 0)
1572 self.assertEquals(f.seek(0, 2), cookie)
1573 self.assertEquals(f.write("def"), 3)
1574 self.assertEquals(f.seek(cookie), cookie)
1575 self.assertEquals(f.read(), "def")
1576 if enc.startswith("utf"):
1577 self.multi_line_test(f, enc)
1578 f.close()
1579
1580 def multi_line_test(self, f, enc):
1581 f.seek(0)
1582 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001583 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001584 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001585 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001586 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001587 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001588 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001589 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001590 wlines.append((f.tell(), line))
1591 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001592 f.seek(0)
1593 rlines = []
1594 while True:
1595 pos = f.tell()
1596 line = f.readline()
1597 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001598 break
1599 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001600 self.assertEquals(rlines, wlines)
1601
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 def test_telling(self):
1603 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001604 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001605 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001606 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001607 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001608 p2 = f.tell()
1609 f.seek(0)
1610 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001611 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001612 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001613 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001614 self.assertEquals(f.tell(), p2)
1615 f.seek(0)
1616 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001617 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001618 self.assertRaises(IOError, f.tell)
1619 self.assertEquals(f.tell(), p2)
1620 f.close()
1621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 def test_seeking(self):
1623 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001624 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001625 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001626 prefix = bytes(u_prefix.encode("utf-8"))
1627 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001628 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001629 suffix = bytes(u_suffix.encode("utf-8"))
1630 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001632 f.write(line*2)
1633 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001635 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001636 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001637 self.assertEquals(f.tell(), prefix_size)
1638 self.assertEquals(f.readline(), u_suffix)
1639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001641 # Regression test for a specific bug
1642 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001644 f.write(data)
1645 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001647 f._CHUNK_SIZE # Just test that it exists
1648 f._CHUNK_SIZE = 2
1649 f.readline()
1650 f.tell()
1651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 def test_seek_and_tell(self):
1653 #Test seek/tell using the StatefulIncrementalDecoder.
1654 # Make test faster by doing smaller seeks
1655 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001656
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001657 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001658 """Tell/seek to various points within a data stream and ensure
1659 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001661 f.write(data)
1662 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 f = self.open(support.TESTFN, encoding='test_decoder')
1664 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001665 decoded = f.read()
1666 f.close()
1667
Neal Norwitze2b07052008-03-18 19:52:05 +00001668 for i in range(min_pos, len(decoded) + 1): # seek positions
1669 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001671 self.assertEquals(f.read(i), decoded[:i])
1672 cookie = f.tell()
1673 self.assertEquals(f.read(j), decoded[i:i + j])
1674 f.seek(cookie)
1675 self.assertEquals(f.read(), decoded[i:])
1676 f.close()
1677
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001678 # Enable the test decoder.
1679 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001680
1681 # Run the tests.
1682 try:
1683 # Try each test case.
1684 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001685 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001686
1687 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001688 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1689 offset = CHUNK_SIZE - len(input)//2
1690 prefix = b'.'*offset
1691 # Don't bother seeking into the prefix (takes too long).
1692 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001693 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001694
1695 # Ensure our test decoder won't interfere with subsequent tests.
1696 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001697 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001700 data = "1234567890"
1701 tests = ("utf-16",
1702 "utf-16-le",
1703 "utf-16-be",
1704 "utf-32",
1705 "utf-32-le",
1706 "utf-32-be")
1707 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 buf = self.BytesIO()
1709 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001710 # Check if the BOM is written only once (see issue1753).
1711 f.write(data)
1712 f.write(data)
1713 f.seek(0)
1714 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001715 f.seek(0)
1716 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001717 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 def test_read_one_by_one(self):
1720 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001721 reads = ""
1722 while True:
1723 c = txt.read(1)
1724 if not c:
1725 break
1726 reads += c
1727 self.assertEquals(reads, "AA\nBB")
1728
1729 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001731 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001733 reads = ""
1734 while True:
1735 c = txt.read(128)
1736 if not c:
1737 break
1738 reads += c
1739 self.assertEquals(reads, "A"*127+"\nB")
1740
1741 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001743
1744 # read one char at a time
1745 reads = ""
1746 while True:
1747 c = txt.read(1)
1748 if not c:
1749 break
1750 reads += c
1751 self.assertEquals(reads, self.normalized)
1752
1753 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001755 txt._CHUNK_SIZE = 4
1756
1757 reads = ""
1758 while True:
1759 c = txt.read(4)
1760 if not c:
1761 break
1762 reads += c
1763 self.assertEquals(reads, self.normalized)
1764
1765 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001767 txt._CHUNK_SIZE = 4
1768
1769 reads = txt.read(4)
1770 reads += txt.read(4)
1771 reads += txt.readline()
1772 reads += txt.readline()
1773 reads += txt.readline()
1774 self.assertEquals(reads, self.normalized)
1775
1776 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001778 txt._CHUNK_SIZE = 4
1779
1780 reads = txt.read(4)
1781 reads += txt.read()
1782 self.assertEquals(reads, self.normalized)
1783
1784 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001785 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001786 txt._CHUNK_SIZE = 4
1787
1788 reads = txt.read(4)
1789 pos = txt.tell()
1790 txt.seek(0)
1791 txt.seek(pos)
1792 self.assertEquals(txt.read(4), "BBB\n")
1793
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001794 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 buffer = self.BytesIO(self.testdata)
1796 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001797
1798 self.assertEqual(buffer.seekable(), txt.seekable())
1799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800class CTextIOWrapperTest(TextIOWrapperTest):
1801
1802 def test_initialization(self):
1803 r = self.BytesIO(b"\xc3\xa9\n\n")
1804 b = self.BufferedReader(r, 1000)
1805 t = self.TextIOWrapper(b)
1806 self.assertRaises(TypeError, t.__init__, b, newline=42)
1807 self.assertRaises(ValueError, t.read)
1808 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1809 self.assertRaises(ValueError, t.read)
1810
1811 def test_garbage_collection(self):
1812 # C TextIOWrapper objects are collected, and collecting them flushes
1813 # all data to disk.
1814 # The Python version has __del__, so it ends in gc.garbage instead.
1815 rawio = io.FileIO(support.TESTFN, "wb")
1816 b = self.BufferedWriter(rawio)
1817 t = self.TextIOWrapper(b, encoding="ascii")
1818 t.write("456def")
1819 t.x = t
1820 wr = weakref.ref(t)
1821 del t
1822 gc.collect()
1823 self.assert_(wr() is None, wr)
1824 with open(support.TESTFN, "rb") as f:
1825 self.assertEqual(f.read(), b"456def")
1826
1827class PyTextIOWrapperTest(TextIOWrapperTest):
1828 pass
1829
1830
1831class IncrementalNewlineDecoderTest(unittest.TestCase):
1832
1833 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001834 # UTF-8 specific tests for a newline decoder
1835 def _check_decode(b, s, **kwargs):
1836 # We exercise getstate() / setstate() as well as decode()
1837 state = decoder.getstate()
1838 self.assertEquals(decoder.decode(b, **kwargs), s)
1839 decoder.setstate(state)
1840 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001841
Antoine Pitrou180a3362008-12-14 16:36:46 +00001842 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001843
Antoine Pitrou180a3362008-12-14 16:36:46 +00001844 _check_decode(b'\xe8', "")
1845 _check_decode(b'\xa2', "")
1846 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001847
Antoine Pitrou180a3362008-12-14 16:36:46 +00001848 _check_decode(b'\xe8', "")
1849 _check_decode(b'\xa2', "")
1850 _check_decode(b'\x88', "\u8888")
1851
1852 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001853 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1854
Antoine Pitrou180a3362008-12-14 16:36:46 +00001855 decoder.reset()
1856 _check_decode(b'\n', "\n")
1857 _check_decode(b'\r', "")
1858 _check_decode(b'', "\n", final=True)
1859 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001860
Antoine Pitrou180a3362008-12-14 16:36:46 +00001861 _check_decode(b'\r', "")
1862 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001863
Antoine Pitrou180a3362008-12-14 16:36:46 +00001864 _check_decode(b'\r\r\n', "\n\n")
1865 _check_decode(b'\r', "")
1866 _check_decode(b'\r', "\n")
1867 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001868
Antoine Pitrou180a3362008-12-14 16:36:46 +00001869 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1870 _check_decode(b'\xe8\xa2\x88', "\u8888")
1871 _check_decode(b'\n', "\n")
1872 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1873 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001876 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 if encoding is not None:
1878 encoder = codecs.getincrementalencoder(encoding)()
1879 def _decode_bytewise(s):
1880 # Decode one byte at a time
1881 for b in encoder.encode(s):
1882 result.append(decoder.decode(bytes([b])))
1883 else:
1884 encoder = None
1885 def _decode_bytewise(s):
1886 # Decode one char at a time
1887 for c in s:
1888 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00001889 self.assertEquals(decoder.newlines, None)
1890 _decode_bytewise("abc\n\r")
1891 self.assertEquals(decoder.newlines, '\n')
1892 _decode_bytewise("\nabc")
1893 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1894 _decode_bytewise("abc\r")
1895 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1896 _decode_bytewise("abc")
1897 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1898 _decode_bytewise("abc\r")
1899 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1900 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001901 input = "abc"
1902 if encoder is not None:
1903 encoder.reset()
1904 input = encoder.encode(input)
1905 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00001906 self.assertEquals(decoder.newlines, None)
1907
1908 def test_newline_decoder(self):
1909 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 # None meaning the IncrementalNewlineDecoder takes unicode input
1911 # rather than bytes input
1912 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00001913 'utf-16', 'utf-16-le', 'utf-16-be',
1914 'utf-32', 'utf-32-le', 'utf-32-be',
1915 )
1916 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 decoder = enc and codecs.getincrementaldecoder(enc)()
1918 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1919 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001920 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1922 self.check_newline_decoding_utf8(decoder)
1923
Antoine Pitrou66913e22009-03-06 23:40:56 +00001924 def test_newline_bytes(self):
1925 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
1926 def _check(dec):
1927 self.assertEquals(dec.newlines, None)
1928 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
1929 self.assertEquals(dec.newlines, None)
1930 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
1931 self.assertEquals(dec.newlines, None)
1932 dec = self.IncrementalNewlineDecoder(None, translate=False)
1933 _check(dec)
1934 dec = self.IncrementalNewlineDecoder(None, translate=True)
1935 _check(dec)
1936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1938 pass
1939
1940class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1941 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00001942
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001943
Guido van Rossum01a27522007-03-07 01:00:12 +00001944# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001945
Guido van Rossum5abbf752007-08-27 17:39:33 +00001946class MiscIOTest(unittest.TestCase):
1947
Barry Warsaw40e82462008-11-20 20:14:50 +00001948 def tearDown(self):
1949 support.unlink(support.TESTFN)
1950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 def test___all__(self):
1952 for name in self.io.__all__:
1953 obj = getattr(self.io, name, None)
Guido van Rossum5abbf752007-08-27 17:39:33 +00001954 self.assert_(obj is not None, name)
1955 if name == "open":
1956 continue
1957 elif "error" in name.lower():
1958 self.assert_(issubclass(obj, Exception), name)
1959 else:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001960 self.assert_(issubclass(obj, self.IOBase), name)
Benjamin Peterson65676e42008-11-05 21:42:45 +00001961
Barry Warsaw40e82462008-11-20 20:14:50 +00001962 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001964 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001965 f.close()
1966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00001968 self.assertEquals(f.name, support.TESTFN)
1969 self.assertEquals(f.buffer.name, support.TESTFN)
1970 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1971 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001972 self.assertEquals(f.buffer.mode, "rb")
1973 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001974 f.close()
1975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001977 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001978 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1979 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001982 self.assertEquals(g.mode, "wb")
1983 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001984 self.assertEquals(g.name, f.fileno())
1985 self.assertEquals(g.raw.name, f.fileno())
1986 f.close()
1987 g.close()
1988
Antoine Pitrou8043cf82009-01-09 19:54:29 +00001989 def test_io_after_close(self):
1990 for kwargs in [
1991 {"mode": "w"},
1992 {"mode": "wb"},
1993 {"mode": "w", "buffering": 1},
1994 {"mode": "w", "buffering": 2},
1995 {"mode": "wb", "buffering": 0},
1996 {"mode": "r"},
1997 {"mode": "rb"},
1998 {"mode": "r", "buffering": 1},
1999 {"mode": "r", "buffering": 2},
2000 {"mode": "rb", "buffering": 0},
2001 {"mode": "w+"},
2002 {"mode": "w+b"},
2003 {"mode": "w+", "buffering": 1},
2004 {"mode": "w+", "buffering": 2},
2005 {"mode": "w+b", "buffering": 0},
2006 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002008 f.close()
2009 self.assertRaises(ValueError, f.flush)
2010 self.assertRaises(ValueError, f.fileno)
2011 self.assertRaises(ValueError, f.isatty)
2012 self.assertRaises(ValueError, f.__iter__)
2013 if hasattr(f, "peek"):
2014 self.assertRaises(ValueError, f.peek, 1)
2015 self.assertRaises(ValueError, f.read)
2016 if hasattr(f, "read1"):
2017 self.assertRaises(ValueError, f.read1, 1024)
2018 if hasattr(f, "readinto"):
2019 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2020 self.assertRaises(ValueError, f.readline)
2021 self.assertRaises(ValueError, f.readlines)
2022 self.assertRaises(ValueError, f.seek, 0)
2023 self.assertRaises(ValueError, f.tell)
2024 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 self.assertRaises(ValueError, f.write,
2026 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002027 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002030 def test_blockingioerror(self):
2031 # Various BlockingIOError issues
2032 self.assertRaises(TypeError, self.BlockingIOError)
2033 self.assertRaises(TypeError, self.BlockingIOError, 1)
2034 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2035 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2036 b = self.BlockingIOError(1, "")
2037 self.assertEqual(b.characters_written, 0)
2038 class C(str):
2039 pass
2040 c = C("")
2041 b = self.BlockingIOError(1, c)
2042 c.b = b
2043 b.c = c
2044 wr = weakref.ref(c)
2045 del c, b
2046 gc.collect()
2047 self.assert_(wr() is None, wr)
2048
2049 def test_abcs(self):
2050 # Test the visible base classes are ABCs.
2051 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2052 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2053 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2054 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2055
2056 def _check_abc_inheritance(self, abcmodule):
2057 with self.open(support.TESTFN, "wb", buffering=0) as f:
2058 self.assertTrue(isinstance(f, abcmodule.IOBase))
2059 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2060 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2061 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2062 with self.open(support.TESTFN, "wb") as f:
2063 self.assertTrue(isinstance(f, abcmodule.IOBase))
2064 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2065 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2066 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2067 with self.open(support.TESTFN, "w") as f:
2068 self.assertTrue(isinstance(f, abcmodule.IOBase))
2069 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2070 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2071 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2072
2073 def test_abc_inheritance(self):
2074 # Test implementations inherit from their respective ABCs
2075 self._check_abc_inheritance(self)
2076
2077 def test_abc_inheritance_official(self):
2078 # Test implementations inherit from the official ABCs of the
2079 # baseline "io" module.
2080 self._check_abc_inheritance(io)
2081
2082class CMiscIOTest(MiscIOTest):
2083 io = io
2084
2085class PyMiscIOTest(MiscIOTest):
2086 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002087
Guido van Rossum28524c72007-02-27 05:47:44 +00002088def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002089 tests = (CIOTest, PyIOTest,
2090 CBufferedReaderTest, PyBufferedReaderTest,
2091 CBufferedWriterTest, PyBufferedWriterTest,
2092 CBufferedRWPairTest, PyBufferedRWPairTest,
2093 CBufferedRandomTest, PyBufferedRandomTest,
2094 StatefulIncrementalDecoderTest,
2095 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2096 CTextIOWrapperTest, PyTextIOWrapperTest,
2097 CMiscIOTest, PyMiscIOTest,)
2098
2099 # Put the namespaces of the IO module we are testing and some useful mock
2100 # classes in the __dict__ of each test.
2101 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2102 MockNonBlockWriterIO)
2103 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2104 c_io_ns = {name : getattr(io, name) for name in all_members}
2105 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2106 globs = globals()
2107 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2108 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2109 # Avoid turning open into a bound method.
2110 py_io_ns["open"] = pyio.OpenWrapper
2111 for test in tests:
2112 if test.__name__.startswith("C"):
2113 for name, obj in c_io_ns.items():
2114 setattr(test, name, obj)
2115 elif test.__name__.startswith("Py"):
2116 for name, obj in py_io_ns.items():
2117 setattr(test, name, obj)
2118
2119 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002120
2121if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 test_main()