blob: 1c7d1f14ff257d2024ff1f18b880698d90344515 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import weakref
30import gc
31import abc
32from itertools import chain, cycle, count
33from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000039
Guido van Rossuma9e20242007-03-08 00:43:48 +000040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041def _default_chunk_size():
42 """Get the default TextIOWrapper chunk size"""
43 with open(__file__, "r", encoding="latin1") as f:
44 return f._CHUNK_SIZE
45
46
47class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000049 def __init__(self, read_stack=()):
50 self._read_stack = list(read_stack)
51 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000053
54 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000056 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000059 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
89 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
106class CMockRawIO(MockRawIO, io.RawIOBase):
107 pass
108
109class PyMockRawIO(MockRawIO, pyio.RawIOBase):
110 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111
Guido van Rossuma9e20242007-03-08 00:43:48 +0000112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113class MisbehavedRawIO(MockRawIO):
114 def write(self, b):
115 return super().write(b) * 2
116
117 def read(self, n=None):
118 return super().read(n) * 2
119
120 def seek(self, pos, whence):
121 return -123
122
123 def tell(self):
124 return -456
125
126 def readinto(self, buf):
127 super().readinto(buf)
128 return len(buf) * 5
129
130class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
131 pass
132
133class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
134 pass
135
136
137class CloseFailureIO(MockRawIO):
138 closed = 0
139
140 def close(self):
141 if not self.closed:
142 self.closed = 1
143 raise IOError
144
145class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
146 pass
147
148class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
149 pass
150
151
152class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000153
154 def __init__(self, data):
155 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000157
158 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000159 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000160 self.read_history.append(None if res is None else len(res))
161 return res
162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 def readinto(self, b):
164 res = super().readinto(b)
165 self.read_history.append(res)
166 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000168class CMockFileIO(MockFileIO, io.BytesIO):
169 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000171class PyMockFileIO(MockFileIO, pyio.BytesIO):
172 pass
173
174
175class MockNonBlockWriterIO:
176
177 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000178 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def pop_written(self):
182 s = b"".join(self._write_stack)
183 self._write_stack[:] = []
184 return s
185
186 def block_on(self, char):
187 """Block when a given char is encountered."""
188 self._blocker_char = char
189
190 def readable(self):
191 return True
192
193 def seekable(self):
194 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Guido van Rossum01a27522007-03-07 01:00:12 +0000196 def writable(self):
197 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000199 def write(self, b):
200 b = bytes(b)
201 n = -1
202 if self._blocker_char:
203 try:
204 n = b.index(self._blocker_char)
205 except ValueError:
206 pass
207 else:
208 self._blocker_char = None
209 self._write_stack.append(b[:n])
210 raise self.BlockingIOError(0, "test blocking", n)
211 self._write_stack.append(b)
212 return len(b)
213
214class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
215 BlockingIOError = io.BlockingIOError
216
217class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
218 BlockingIOError = pyio.BlockingIOError
219
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Guido van Rossum28524c72007-02-27 05:47:44 +0000221class IOTest(unittest.TestCase):
222
Neal Norwitze7789b12008-03-24 06:18:09 +0000223 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000225
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000226 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000228
Guido van Rossum28524c72007-02-27 05:47:44 +0000229 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000230 self.assertEqual(f.write(b"blah."), 5)
231 self.assertEqual(f.seek(0), 0)
232 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000233 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000234 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000235 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000236 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000237 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000238 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000239 self.assertEqual(f.seek(-1, 2), 13)
240 self.assertEqual(f.tell(), 13)
241 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000242 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000243 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000244
Guido van Rossum9b76da62007-04-11 01:09:03 +0000245 def read_ops(self, f, buffered=False):
246 data = f.read(5)
247 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000248 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000249 self.assertEqual(f.readinto(data), 5)
250 self.assertEqual(data, b" worl")
251 self.assertEqual(f.readinto(data), 2)
252 self.assertEqual(len(data), 5)
253 self.assertEqual(data[:2], b"d\n")
254 self.assertEqual(f.seek(0), 0)
255 self.assertEqual(f.read(20), b"hello world\n")
256 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000257 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000258 self.assertEqual(f.seek(-6, 2), 6)
259 self.assertEqual(f.read(5), b"world")
260 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000261 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000262 self.assertEqual(f.seek(-6, 1), 5)
263 self.assertEqual(f.read(5), b" worl")
264 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000265 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000266 if buffered:
267 f.seek(0)
268 self.assertEqual(f.read(), b"hello world\n")
269 f.seek(6)
270 self.assertEqual(f.read(), b"world\n")
271 self.assertEqual(f.read(), b"")
272
Guido van Rossum34d69e52007-04-10 20:08:41 +0000273 LARGE = 2**31
274
Guido van Rossum53807da2007-04-10 19:01:47 +0000275 def large_file_ops(self, f):
276 assert f.readable()
277 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000278 self.assertEqual(f.seek(self.LARGE), self.LARGE)
279 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000280 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000281 self.assertEqual(f.tell(), self.LARGE + 3)
282 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000284 self.assertEqual(f.tell(), self.LARGE + 2)
285 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000287 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000288 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
289 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000290 self.assertEqual(f.read(2), b"x")
291
Guido van Rossum28524c72007-02-27 05:47:44 +0000292 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000294 self.assertEqual(f.readable(), False)
295 self.assertEqual(f.writable(), True)
296 self.assertEqual(f.seekable(), True)
297 self.write_ops(f)
298 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000299 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.readable(), True)
301 self.assertEqual(f.writable(), False)
302 self.assertEqual(f.seekable(), True)
303 self.read_ops(f)
304 f.close()
305
Guido van Rossum87429772007-04-10 21:06:59 +0000306 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000307 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000313 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.readable(), True)
315 self.assertEqual(f.writable(), False)
316 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000318 f.close()
319
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000320 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000321 f = io.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000322 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000323 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000324 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000325 self.assertEqual(f.readline(), b"abc\n")
326 self.assertEqual(f.readline(10), b"def\n")
327 self.assertEqual(f.readline(2), b"xy")
328 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000329 self.assertEqual(f.readline(), b"foo\x00bar\n")
330 self.assertEqual(f.readline(), b"another line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000331 f.close()
332
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000334 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000335 self.write_ops(f)
336 data = f.getvalue()
337 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000338 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000339 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000340
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000342 # On Windows and Mac OSX this test comsumes large resources; It takes
343 # a long time to build the >2GB file and takes >2GB of disk space
344 # therefore the resource must be enabled to run this test.
345 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000346 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000347 print("\nTesting large file ops skipped on %s." % sys.platform,
348 file=sys.stderr)
349 print("It requires %d bytes and a long time." % self.LARGE,
350 file=sys.stderr)
351 print("Use 'regrtest.py -u largefile test_io' to run it.",
352 file=sys.stderr)
353 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000354 with self.open(support.TESTFN, "w+b", 0) as f:
355 self.large_file_ops(f)
356 with self.open(support.TESTFN, "w+b") as f:
357 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000358
359 def test_with_open(self):
360 for bufsize in (0, 1, 100):
361 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000363 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.closed, True)
365 f = None
366 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000367 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000368 1/0
369 except ZeroDivisionError:
370 self.assertEqual(f.closed, True)
371 else:
372 self.fail("1/0 didn't raise an exception")
373
Antoine Pitrou08838b62009-01-21 00:55:13 +0000374 # issue 5008
375 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000377 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000378 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000379 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000382 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000383 self.assert_(f.tell() > 0)
384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_destructor(self):
386 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000388 def __del__(self):
389 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000390 try:
391 f = super().__del__
392 except AttributeError:
393 pass
394 else:
395 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def close(self):
397 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def flush(self):
400 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 super().flush()
402 f = MyFileIO(support.TESTFN, "wb")
403 f.write(b"xxx")
404 del f
405 self.assertEqual(record, [1, 2, 3])
406 f = open(support.TESTFN, "rb")
407 self.assertEqual(f.read(), b"xxx")
408
409 def _check_base_destructor(self, base):
410 record = []
411 class MyIO(base):
412 def __init__(self):
413 # This exercises the availability of attributes on object
414 # destruction.
415 # (in the C version, close() is called by the tp_dealloc
416 # function, not by __del__)
417 self.on_del = 1
418 self.on_close = 2
419 self.on_flush = 3
420 def __del__(self):
421 record.append(self.on_del)
422 try:
423 f = super().__del__
424 except AttributeError:
425 pass
426 else:
427 f()
428 def close(self):
429 record.append(self.on_close)
430 super().close()
431 def flush(self):
432 record.append(self.on_flush)
433 super().flush()
434 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000435 del f
436 self.assertEqual(record, [1, 2, 3])
437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 def test_IOBase_destructor(self):
439 self._check_base_destructor(self.IOBase)
440
441 def test_RawIOBase_destructor(self):
442 self._check_base_destructor(self.RawIOBase)
443
444 def test_BufferedIOBase_destructor(self):
445 self._check_base_destructor(self.BufferedIOBase)
446
447 def test_TextIOBase_destructor(self):
448 self._check_base_destructor(self.TextIOBase)
449
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000452 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000453 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000455 self.assertEqual(f.read(), b"xxx")
456 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000457
Guido van Rossumd4103952007-04-12 05:44:49 +0000458 def test_array_writes(self):
459 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000460 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000462 self.assertEqual(f.write(a), n)
463 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000465 self.assertEqual(f.write(a), n)
466 f.close()
467
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000468 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000469 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000470 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 def test_read_closed(self):
473 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000474 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 with self.open(support.TESTFN, "r") as f:
476 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000477 self.assertEqual(file.read(), "egg\n")
478 file.seek(0)
479 file.close()
480 self.assertRaises(ValueError, file.read)
481
482 def test_no_closefd_with_filename(self):
483 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000485
486 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000489 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000490 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 self.assertEqual(file.buffer.raw.closefd, False)
493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000494 def test_garbage_collection(self):
495 # FileIO objects are collected, and collecting them flushes
496 # all data to disk.
497 f = self.FileIO(support.TESTFN, "wb")
498 f.write(b"abcxxx")
499 f.f = f
500 wr = weakref.ref(f)
501 del f
502 gc.collect()
503 self.assert_(wr() is None, wr)
504 with open(support.TESTFN, "rb") as f:
505 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507class CIOTest(IOTest):
508 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510class PyIOTest(IOTest):
511 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000512
Guido van Rossuma9e20242007-03-08 00:43:48 +0000513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514class CommonBufferedTests:
515 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
516
517 def test_fileno(self):
518 rawio = self.MockRawIO()
519 bufio = self.tp(rawio)
520
521 self.assertEquals(42, bufio.fileno())
522
523 def test_no_fileno(self):
524 # XXX will we always have fileno() function? If so, kill
525 # this test. Else, write it.
526 pass
527
528 def test_invalid_args(self):
529 rawio = self.MockRawIO()
530 bufio = self.tp(rawio)
531 # Invalid whence
532 self.assertRaises(ValueError, bufio.seek, 0, -1)
533 self.assertRaises(ValueError, bufio.seek, 0, 3)
534
535 def test_override_destructor(self):
536 tp = self.tp
537 record = []
538 class MyBufferedIO(tp):
539 def __del__(self):
540 record.append(1)
541 try:
542 f = super().__del__
543 except AttributeError:
544 pass
545 else:
546 f()
547 def close(self):
548 record.append(2)
549 super().close()
550 def flush(self):
551 record.append(3)
552 super().flush()
553 rawio = self.MockRawIO()
554 bufio = MyBufferedIO(rawio)
555 writable = bufio.writable()
556 del bufio
557 if writable:
558 self.assertEqual(record, [1, 2, 3])
559 else:
560 self.assertEqual(record, [1, 2])
561
562 def test_context_manager(self):
563 # Test usability as a context manager
564 rawio = self.MockRawIO()
565 bufio = self.tp(rawio)
566 def _with():
567 with bufio:
568 pass
569 _with()
570 # bufio should now be closed, and using it a second time should raise
571 # a ValueError.
572 self.assertRaises(ValueError, _with)
573
574 def test_error_through_destructor(self):
575 # Test that the exception state is not modified by a destructor,
576 # even if close() fails.
577 rawio = self.CloseFailureIO()
578 def f():
579 self.tp(rawio).xyzzy
580 with support.captured_output("stderr") as s:
581 self.assertRaises(AttributeError, f)
582 s = s.getvalue().strip()
583 if s:
584 # The destructor *may* have printed an unraisable error, check it
585 self.assertEqual(len(s.splitlines()), 1)
586 self.assert_(s.startswith("Exception IOError: "), s)
587 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000588
589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
591 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000593 def test_constructor(self):
594 rawio = self.MockRawIO([b"abc"])
595 bufio = self.tp(rawio)
596 bufio.__init__(rawio)
597 bufio.__init__(rawio, buffer_size=1024)
598 bufio.__init__(rawio, buffer_size=16)
599 self.assertEquals(b"abc", bufio.read())
600 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
601 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
602 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
603 rawio = self.MockRawIO([b"abc"])
604 bufio.__init__(rawio)
605 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 def test_read(self):
608 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
609 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000610 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 # Invalid args
612 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 def test_read1(self):
615 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
616 bufio = self.tp(rawio)
617 self.assertEquals(b"a", bufio.read(1))
618 self.assertEquals(b"b", bufio.read1(1))
619 self.assertEquals(rawio._reads, 1)
620 self.assertEquals(b"c", bufio.read1(100))
621 self.assertEquals(rawio._reads, 1)
622 self.assertEquals(b"d", bufio.read1(100))
623 self.assertEquals(rawio._reads, 2)
624 self.assertEquals(b"efg", bufio.read1(100))
625 self.assertEquals(rawio._reads, 3)
626 self.assertEquals(b"", bufio.read1(100))
627 # Invalid args
628 self.assertRaises(ValueError, bufio.read1, -1)
629
630 def test_readinto(self):
631 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
632 bufio = self.tp(rawio)
633 b = bytearray(2)
634 self.assertEquals(bufio.readinto(b), 2)
635 self.assertEquals(b, b"ab")
636 self.assertEquals(bufio.readinto(b), 2)
637 self.assertEquals(b, b"cd")
638 self.assertEquals(bufio.readinto(b), 2)
639 self.assertEquals(b, b"ef")
640 self.assertEquals(bufio.readinto(b), 1)
641 self.assertEquals(b, b"gf")
642 self.assertEquals(bufio.readinto(b), 0)
643 self.assertEquals(b, b"gf")
644
645 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000646 data = b"abcdefghi"
647 dlen = len(data)
648
649 tests = [
650 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
651 [ 100, [ 3, 3, 3], [ dlen ] ],
652 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
653 ]
654
655 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 rawio = self.MockFileIO(data)
657 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000658 pos = 0
659 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000660 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000661 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000663 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000666 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
668 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000669
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000670 self.assertEquals(b"abcd", bufio.read(6))
671 self.assertEquals(b"e", bufio.read(1))
672 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000673 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000674 self.assert_(None is bufio.read())
675 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_read_past_eof(self):
678 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
679 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000680
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000681 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 def test_read_all(self):
684 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
685 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000686
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000687 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000690 try:
691 # Write out many bytes with exactly the same number of 0's,
692 # 1's... 255's. This will help us check that concurrent reading
693 # doesn't duplicate or forget contents.
694 N = 1000
695 l = list(range(256)) * N
696 random.shuffle(l)
697 s = bytes(bytearray(l))
698 with io.open(support.TESTFN, "wb") as f:
699 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
701 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000702 errors = []
703 results = []
704 def f():
705 try:
706 # Intra-buffer read then buffer-flushing read
707 for n in cycle([1, 19]):
708 s = bufio.read(n)
709 if not s:
710 break
711 # list.append() is atomic
712 results.append(s)
713 except Exception as e:
714 errors.append(e)
715 raise
716 threads = [threading.Thread(target=f) for x in range(20)]
717 for t in threads:
718 t.start()
719 time.sleep(0.02) # yield
720 for t in threads:
721 t.join()
722 self.assertFalse(errors,
723 "the following exceptions were caught: %r" % errors)
724 s = b''.join(results)
725 for i in range(256):
726 c = bytes(bytearray([i]))
727 self.assertEqual(s.count(c), N)
728 finally:
729 support.unlink(support.TESTFN)
730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_misbehaved_io(self):
732 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
733 bufio = self.tp(rawio)
734 self.assertRaises(IOError, bufio.seek, 0)
735 self.assertRaises(IOError, bufio.tell)
736
737class CBufferedReaderTest(BufferedReaderTest):
738 tp = io.BufferedReader
739
740 def test_constructor(self):
741 BufferedReaderTest.test_constructor(self)
742 # The allocation can succeed on 32-bit builds, e.g. with more
743 # than 2GB RAM and a 64-bit kernel.
744 if sys.maxsize > 0x7FFFFFFF:
745 rawio = self.MockRawIO()
746 bufio = self.tp(rawio)
747 self.assertRaises((OverflowError, MemoryError, ValueError),
748 bufio.__init__, rawio, sys.maxsize)
749
750 def test_initialization(self):
751 rawio = self.MockRawIO([b"abc"])
752 bufio = self.tp(rawio)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
754 self.assertRaises(ValueError, bufio.read)
755 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
756 self.assertRaises(ValueError, bufio.read)
757 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
758 self.assertRaises(ValueError, bufio.read)
759
760 def test_misbehaved_io_read(self):
761 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
762 bufio = self.tp(rawio)
763 # _pyio.BufferedReader seems to implement reading different, so that
764 # checking this is not so easy.
765 self.assertRaises(IOError, bufio.read, 10)
766
767 def test_garbage_collection(self):
768 # C BufferedReader objects are collected.
769 # The Python version has __del__, so it ends into gc.garbage instead
770 rawio = self.FileIO(support.TESTFN, "w+b")
771 f = self.tp(rawio)
772 f.f = f
773 wr = weakref.ref(f)
774 del f
775 gc.collect()
776 self.assert_(wr() is None, wr)
777
778class PyBufferedReaderTest(BufferedReaderTest):
779 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000780
Guido van Rossuma9e20242007-03-08 00:43:48 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
783 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000784
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000785 def test_constructor(self):
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio)
788 bufio.__init__(rawio)
789 bufio.__init__(rawio, buffer_size=1024)
790 bufio.__init__(rawio, buffer_size=16)
791 self.assertEquals(3, bufio.write(b"abc"))
792 bufio.flush()
793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796 bufio.__init__(rawio)
797 self.assertEquals(3, bufio.write(b"ghi"))
798 bufio.flush()
799 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
800
801 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000802 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803 writer = self.MockRawIO()
804 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000805 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000806 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 def test_write_overflow(self):
809 writer = self.MockRawIO()
810 bufio = self.tp(writer, 8)
811 contents = b"abcdefghijklmnop"
812 for n in range(0, len(contents), 3):
813 bufio.write(contents[n:n+3])
814 flushed = b"".join(writer._write_stack)
815 # At least (total - 8) bytes were implicitly flushed, perhaps more
816 # depending on the implementation.
817 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 def check_writes(self, intermediate_func):
820 # Lots of writes, test the flushed output is as expected.
821 contents = bytes(range(256)) * 1000
822 n = 0
823 writer = self.MockRawIO()
824 bufio = self.tp(writer, 13)
825 # Generator of write sizes: repeat each N 15 times then proceed to N+1
826 def gen_sizes():
827 for size in count(1):
828 for i in range(15):
829 yield size
830 sizes = gen_sizes()
831 while n < len(contents):
832 size = min(next(sizes), len(contents) - n)
833 self.assertEquals(bufio.write(contents[n:n+size]), size)
834 intermediate_func(bufio)
835 n += size
836 bufio.flush()
837 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839 def test_writes(self):
840 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_writes_and_flushes(self):
843 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_writes_and_seeks(self):
846 def _seekabs(bufio):
847 pos = bufio.tell()
848 bufio.seek(pos + 1, 0)
849 bufio.seek(pos - 1, 0)
850 bufio.seek(pos, 0)
851 self.check_writes(_seekabs)
852 def _seekrel(bufio):
853 pos = bufio.seek(0, 1)
854 bufio.seek(+1, 1)
855 bufio.seek(-1, 1)
856 bufio.seek(pos, 0)
857 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_writes_and_truncates(self):
860 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_write_non_blocking(self):
863 raw = self.MockNonBlockWriterIO()
864 bufio = self.tp(raw, 8, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 self.assertEquals(bufio.write(b"abcd"), 4)
867 self.assertEquals(bufio.write(b"efghi"), 5)
868 # 1 byte will be written, the rest will be buffered
869 raw.block_on(b"k")
870 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 # 8 bytes will be written, 8 will be buffered and the rest will be lost
873 raw.block_on(b"0")
874 try:
875 bufio.write(b"opqrwxyz0123456789")
876 except self.BlockingIOError as e:
877 written = e.characters_written
878 else:
879 self.fail("BlockingIOError should have been raised")
880 self.assertEquals(written, 16)
881 self.assertEquals(raw.pop_written(),
882 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
885 s = raw.pop_written()
886 # Previously buffered bytes were flushed
887 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_write_and_rewind(self):
890 raw = io.BytesIO()
891 bufio = self.tp(raw, 4)
892 self.assertEqual(bufio.write(b"abcdef"), 6)
893 self.assertEqual(bufio.tell(), 6)
894 bufio.seek(0, 0)
895 self.assertEqual(bufio.write(b"XY"), 2)
896 bufio.seek(6, 0)
897 self.assertEqual(raw.getvalue(), b"XYcdef")
898 self.assertEqual(bufio.write(b"123456"), 6)
899 bufio.flush()
900 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_flush(self):
903 writer = self.MockRawIO()
904 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000905 bufio.write(b"abc")
906 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000907 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_destructor(self):
910 writer = self.MockRawIO()
911 bufio = self.tp(writer, 8)
912 bufio.write(b"abc")
913 del bufio
914 self.assertEquals(b"abc", writer._write_stack[0])
915
916 def test_truncate(self):
917 # Truncate implicitly flushes the buffer.
918 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
919 bufio = self.tp(raw, 8)
920 bufio.write(b"abcdef")
921 self.assertEqual(bufio.truncate(3), 3)
922 self.assertEqual(bufio.tell(), 3)
923 with io.open(support.TESTFN, "rb", buffering=0) as f:
924 self.assertEqual(f.read(), b"abc")
925
926 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000927 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 # Write out many bytes from many threads and test they were
929 # all flushed.
930 N = 1000
931 contents = bytes(range(256)) * N
932 sizes = cycle([1, 19])
933 n = 0
934 queue = deque()
935 while n < len(contents):
936 size = next(sizes)
937 queue.append(contents[n:n+size])
938 n += size
939 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000940 # We use a real file object because it allows us to
941 # exercise situations where the GIL is released before
942 # writing the buffer to the raw streams. This is in addition
943 # to concurrency issues due to switching threads in the middle
944 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
946 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000947 errors = []
948 def f():
949 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 while True:
951 try:
952 s = queue.popleft()
953 except IndexError:
954 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000955 bufio.write(s)
956 except Exception as e:
957 errors.append(e)
958 raise
959 threads = [threading.Thread(target=f) for x in range(20)]
960 for t in threads:
961 t.start()
962 time.sleep(0.02) # yield
963 for t in threads:
964 t.join()
965 self.assertFalse(errors,
966 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 bufio.close()
968 with io.open(support.TESTFN, "rb") as f:
969 s = f.read()
970 for i in range(256):
971 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 finally:
973 support.unlink(support.TESTFN)
974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def test_misbehaved_io(self):
976 rawio = self.MisbehavedRawIO()
977 bufio = self.tp(rawio, 5)
978 self.assertRaises(IOError, bufio.seek, 0)
979 self.assertRaises(IOError, bufio.tell)
980 self.assertRaises(IOError, bufio.write, b"abcdef")
981
982class CBufferedWriterTest(BufferedWriterTest):
983 tp = io.BufferedWriter
984
985 def test_constructor(self):
986 BufferedWriterTest.test_constructor(self)
987 # The allocation can succeed on 32-bit builds, e.g. with more
988 # than 2GB RAM and a 64-bit kernel.
989 if sys.maxsize > 0x7FFFFFFF:
990 rawio = self.MockRawIO()
991 bufio = self.tp(rawio)
992 self.assertRaises((OverflowError, MemoryError, ValueError),
993 bufio.__init__, rawio, sys.maxsize)
994
995 def test_initialization(self):
996 rawio = self.MockRawIO()
997 bufio = self.tp(rawio)
998 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
999 self.assertRaises(ValueError, bufio.write, b"def")
1000 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1001 self.assertRaises(ValueError, bufio.write, b"def")
1002 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1003 self.assertRaises(ValueError, bufio.write, b"def")
1004
1005 def test_garbage_collection(self):
1006 # C BufferedWriter objects are collected, and collecting them flushes
1007 # all data to disk.
1008 # The Python version has __del__, so it ends into gc.garbage instead
1009 rawio = self.FileIO(support.TESTFN, "w+b")
1010 f = self.tp(rawio)
1011 f.write(b"123xxx")
1012 f.x = f
1013 wr = weakref.ref(f)
1014 del f
1015 gc.collect()
1016 self.assert_(wr() is None, wr)
1017 with open(support.TESTFN, "rb") as f:
1018 self.assertEqual(f.read(), b"123xxx")
1019
1020
1021class PyBufferedWriterTest(BufferedWriterTest):
1022 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001023
Guido van Rossum01a27522007-03-07 01:00:12 +00001024class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_basic(self):
1027 r = self.MockRawIO(())
1028 w = self.MockRawIO()
1029 pair = self.tp(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +00001030 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001031
Benjamin Peterson92035012008-12-27 16:00:54 +00001032 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +00001033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034class CBufferedRWPairTest(BufferedRWPairTest):
1035 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037class PyBufferedRWPairTest(BufferedRWPairTest):
1038 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040
1041class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1042 read_mode = "rb+"
1043 write_mode = "wb+"
1044
1045 def test_constructor(self):
1046 BufferedReaderTest.test_constructor(self)
1047 BufferedWriterTest.test_constructor(self)
1048
1049 def test_read_and_write(self):
1050 raw = self.MockRawIO((b"asdf", b"ghjk"))
1051 rw = self.tp(raw, 8, 12)
Guido van Rossum01a27522007-03-07 01:00:12 +00001052
1053 self.assertEqual(b"as", rw.read(2))
1054 rw.write(b"ddd")
1055 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001056 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001058 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_seek_and_tell(self):
1061 raw = self.BytesIO(b"asdfghjkl")
1062 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001063
1064 self.assertEquals(b"as", rw.read(2))
1065 self.assertEquals(2, rw.tell())
1066 rw.seek(0, 0)
1067 self.assertEquals(b"asdf", rw.read(4))
1068
1069 rw.write(b"asdf")
1070 rw.seek(0, 0)
1071 self.assertEquals(b"asdfasdfl", rw.read())
1072 self.assertEquals(9, rw.tell())
1073 rw.seek(-4, 2)
1074 self.assertEquals(5, rw.tell())
1075 rw.seek(2, 1)
1076 self.assertEquals(7, rw.tell())
1077 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001078 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def check_flush_and_read(self, read_func):
1081 raw = self.BytesIO(b"abcdefghi")
1082 bufio = self.tp(raw)
1083
1084 self.assertEquals(b"ab", read_func(bufio, 2))
1085 bufio.write(b"12")
1086 self.assertEquals(b"ef", read_func(bufio, 2))
1087 self.assertEquals(6, bufio.tell())
1088 bufio.flush()
1089 self.assertEquals(6, bufio.tell())
1090 self.assertEquals(b"ghi", read_func(bufio))
1091 raw.seek(0, 0)
1092 raw.write(b"XYZ")
1093 # flush() resets the read buffer
1094 bufio.flush()
1095 bufio.seek(0, 0)
1096 self.assertEquals(b"XYZ", read_func(bufio, 3))
1097
1098 def test_flush_and_read(self):
1099 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1100
1101 def test_flush_and_readinto(self):
1102 def _readinto(bufio, n=-1):
1103 b = bytearray(n if n >= 0 else 9999)
1104 n = bufio.readinto(b)
1105 return bytes(b[:n])
1106 self.check_flush_and_read(_readinto)
1107
1108 def test_flush_and_peek(self):
1109 def _peek(bufio, n=-1):
1110 # This relies on the fact that the buffer can contain the whole
1111 # raw stream, otherwise peek() can return less.
1112 b = bufio.peek(n)
1113 if n != -1:
1114 b = b[:n]
1115 bufio.seek(len(b), 1)
1116 return b
1117 self.check_flush_and_read(_peek)
1118
1119 def test_flush_and_write(self):
1120 raw = self.BytesIO(b"abcdefghi")
1121 bufio = self.tp(raw)
1122
1123 bufio.write(b"123")
1124 bufio.flush()
1125 bufio.write(b"45")
1126 bufio.flush()
1127 bufio.seek(0, 0)
1128 self.assertEquals(b"12345fghi", raw.getvalue())
1129 self.assertEquals(b"12345fghi", bufio.read())
1130
1131 def test_threads(self):
1132 BufferedReaderTest.test_threads(self)
1133 BufferedWriterTest.test_threads(self)
1134
1135 def test_writes_and_peek(self):
1136 def _peek(bufio):
1137 bufio.peek(1)
1138 self.check_writes(_peek)
1139 def _peek(bufio):
1140 pos = bufio.tell()
1141 bufio.seek(-1, 1)
1142 bufio.peek(1)
1143 bufio.seek(pos, 0)
1144 self.check_writes(_peek)
1145
1146 def test_writes_and_reads(self):
1147 def _read(bufio):
1148 bufio.seek(-1, 1)
1149 bufio.read(1)
1150 self.check_writes(_read)
1151
1152 def test_writes_and_read1s(self):
1153 def _read1(bufio):
1154 bufio.seek(-1, 1)
1155 bufio.read1(1)
1156 self.check_writes(_read1)
1157
1158 def test_writes_and_readintos(self):
1159 def _read(bufio):
1160 bufio.seek(-1, 1)
1161 bufio.readinto(bytearray(1))
1162 self.check_writes(_read)
1163
1164 def test_misbehaved_io(self):
1165 BufferedReaderTest.test_misbehaved_io(self)
1166 BufferedWriterTest.test_misbehaved_io(self)
1167
1168class CBufferedRandomTest(BufferedRandomTest):
1169 tp = io.BufferedRandom
1170
1171 def test_constructor(self):
1172 BufferedRandomTest.test_constructor(self)
1173 # The allocation can succeed on 32-bit builds, e.g. with more
1174 # than 2GB RAM and a 64-bit kernel.
1175 if sys.maxsize > 0x7FFFFFFF:
1176 rawio = self.MockRawIO()
1177 bufio = self.tp(rawio)
1178 self.assertRaises((OverflowError, MemoryError, ValueError),
1179 bufio.__init__, rawio, sys.maxsize)
1180
1181 def test_garbage_collection(self):
1182 CBufferedReaderTest.test_garbage_collection(self)
1183 CBufferedWriterTest.test_garbage_collection(self)
1184
1185class PyBufferedRandomTest(BufferedRandomTest):
1186 tp = pyio.BufferedRandom
1187
1188
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001189# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1190# properties:
1191# - A single output character can correspond to many bytes of input.
1192# - The number of input bytes to complete the character can be
1193# undetermined until the last input byte is received.
1194# - The number of input bytes can vary depending on previous input.
1195# - A single input byte can correspond to many characters of output.
1196# - The number of output characters can be undetermined until the
1197# last input byte is received.
1198# - The number of output characters can vary depending on previous input.
1199
1200class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1201 """
1202 For testing seek/tell behavior with a stateful, buffering decoder.
1203
1204 Input is a sequence of words. Words may be fixed-length (length set
1205 by input) or variable-length (period-terminated). In variable-length
1206 mode, extra periods are ignored. Possible words are:
1207 - 'i' followed by a number sets the input length, I (maximum 99).
1208 When I is set to 0, words are space-terminated.
1209 - 'o' followed by a number sets the output length, O (maximum 99).
1210 - Any other word is converted into a word followed by a period on
1211 the output. The output word consists of the input word truncated
1212 or padded out with hyphens to make its length equal to O. If O
1213 is 0, the word is output verbatim without truncating or padding.
1214 I and O are initially set to 1. When I changes, any buffered input is
1215 re-scanned according to the new I. EOF also terminates the last word.
1216 """
1217
1218 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001219 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001220 self.reset()
1221
1222 def __repr__(self):
1223 return '<SID %x>' % id(self)
1224
1225 def reset(self):
1226 self.i = 1
1227 self.o = 1
1228 self.buffer = bytearray()
1229
1230 def getstate(self):
1231 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1232 return bytes(self.buffer), i*100 + o
1233
1234 def setstate(self, state):
1235 buffer, io = state
1236 self.buffer = bytearray(buffer)
1237 i, o = divmod(io, 100)
1238 self.i, self.o = i ^ 1, o ^ 1
1239
1240 def decode(self, input, final=False):
1241 output = ''
1242 for b in input:
1243 if self.i == 0: # variable-length, terminated with period
1244 if b == ord('.'):
1245 if self.buffer:
1246 output += self.process_word()
1247 else:
1248 self.buffer.append(b)
1249 else: # fixed-length, terminate after self.i bytes
1250 self.buffer.append(b)
1251 if len(self.buffer) == self.i:
1252 output += self.process_word()
1253 if final and self.buffer: # EOF terminates the last word
1254 output += self.process_word()
1255 return output
1256
1257 def process_word(self):
1258 output = ''
1259 if self.buffer[0] == ord('i'):
1260 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1261 elif self.buffer[0] == ord('o'):
1262 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1263 else:
1264 output = self.buffer.decode('ascii')
1265 if len(output) < self.o:
1266 output += '-'*self.o # pad out with hyphens
1267 if self.o:
1268 output = output[:self.o] # truncate to output length
1269 output += '.'
1270 self.buffer = bytearray()
1271 return output
1272
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001273 codecEnabled = False
1274
1275 @classmethod
1276 def lookupTestDecoder(cls, name):
1277 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001278 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001279 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001280 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001281 incrementalencoder=None,
1282 streamreader=None, streamwriter=None,
1283 incrementaldecoder=cls)
1284
1285# Register the previous decoder for testing.
1286# Disabled by default, tests will enable it.
1287codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1288
1289
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001290class StatefulIncrementalDecoderTest(unittest.TestCase):
1291 """
1292 Make sure the StatefulIncrementalDecoder actually works.
1293 """
1294
1295 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001296 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001297 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001298 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001299 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001300 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001301 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001302 # I=0, O=6 (variable-length input, fixed-length output)
1303 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1304 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001305 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001306 # I=6, O=3 (fixed-length input > fixed-length output)
1307 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1308 # I=0, then 3; O=29, then 15 (with longer output)
1309 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1310 'a----------------------------.' +
1311 'b----------------------------.' +
1312 'cde--------------------------.' +
1313 'abcdefghijabcde.' +
1314 'a.b------------.' +
1315 '.c.------------.' +
1316 'd.e------------.' +
1317 'k--------------.' +
1318 'l--------------.' +
1319 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001320 ]
1321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001323 # Try a few one-shot test cases.
1324 for input, eof, output in self.test_cases:
1325 d = StatefulIncrementalDecoder()
1326 self.assertEquals(d.decode(input, eof), output)
1327
1328 # Also test an unfinished decode, followed by forcing EOF.
1329 d = StatefulIncrementalDecoder()
1330 self.assertEquals(d.decode(b'oiabcd'), '')
1331 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001332
1333class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001334
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001335 def setUp(self):
1336 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1337 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001338 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001339
Guido van Rossumd0712812007-04-11 16:32:43 +00001340 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001341 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343 def test_constructor(self):
1344 r = self.BytesIO(b"\xc3\xa9\n\n")
1345 b = self.BufferedReader(r, 1000)
1346 t = self.TextIOWrapper(b)
1347 t.__init__(b, encoding="latin1", newline="\r\n")
1348 self.assertEquals(t.encoding, "latin1")
1349 self.assertEquals(t.line_buffering, False)
1350 t.__init__(b, encoding="utf8", line_buffering=True)
1351 self.assertEquals(t.encoding, "utf8")
1352 self.assertEquals(t.line_buffering, True)
1353 self.assertEquals("\xe9\n", t.readline())
1354 self.assertRaises(TypeError, t.__init__, b, newline=42)
1355 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1356
1357 def test_line_buffering(self):
1358 r = self.BytesIO()
1359 b = self.BufferedWriter(r, 1000)
1360 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001361 t.write("X")
1362 self.assertEquals(r.getvalue(), b"") # No flush happened
1363 t.write("Y\nZ")
1364 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1365 t.write("A\rB")
1366 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1367
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368 def test_encoding(self):
1369 # Check the encoding attribute is always set, and valid
1370 b = self.BytesIO()
1371 t = self.TextIOWrapper(b, encoding="utf8")
1372 self.assertEqual(t.encoding, "utf8")
1373 t = self.TextIOWrapper(b)
1374 self.assert_(t.encoding is not None)
1375 codecs.lookup(t.encoding)
1376
1377 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001378 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 b = self.BytesIO(b"abc\n\xff\n")
1380 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001381 self.assertRaises(UnicodeError, t.read)
1382 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 b = self.BytesIO(b"abc\n\xff\n")
1384 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001385 self.assertRaises(UnicodeError, t.read)
1386 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387 b = self.BytesIO(b"abc\n\xff\n")
1388 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001389 self.assertEquals(t.read(), "abc\n\n")
1390 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 b = self.BytesIO(b"abc\n\xff\n")
1392 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001393 self.assertEquals(t.read(), "abc\n\ufffd\n")
1394
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001395 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001396 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 b = self.BytesIO()
1398 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001399 self.assertRaises(UnicodeError, t.write, "\xff")
1400 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 b = self.BytesIO()
1402 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001403 self.assertRaises(UnicodeError, t.write, "\xff")
1404 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 b = self.BytesIO()
1406 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001407 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001408 t.write("abc\xffdef\n")
1409 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001410 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001411 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 b = self.BytesIO()
1413 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001414 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001415 t.write("abc\xffdef\n")
1416 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001417 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001420 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1421
1422 tests = [
1423 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001424 [ '', input_lines ],
1425 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1426 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1427 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001428 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001429 encodings = (
1430 'utf-8', 'latin-1',
1431 'utf-16', 'utf-16-le', 'utf-16-be',
1432 'utf-32', 'utf-32-le', 'utf-32-be',
1433 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001434
Guido van Rossum8358db22007-08-18 21:39:55 +00001435 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001436 # character in TextIOWrapper._pending_line.
1437 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001438 # XXX: str.encode() should return bytes
1439 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001440 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001441 for bufsize in range(1, 10):
1442 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001443 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1444 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001445 encoding=encoding)
1446 if do_reads:
1447 got_lines = []
1448 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001449 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001450 if c2 == '':
1451 break
1452 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001453 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001454 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001455 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001456
1457 for got_line, exp_line in zip(got_lines, exp_lines):
1458 self.assertEquals(got_line, exp_line)
1459 self.assertEquals(len(got_lines), len(exp_lines))
1460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001461 def test_newlines_input(self):
1462 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001463 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1464 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001465 (None, normalized.decode("ascii").splitlines(True)),
1466 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1468 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1469 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001470 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 buf = self.BytesIO(testdata)
1472 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001473 self.assertEquals(txt.readlines(), expected)
1474 txt.seek(0)
1475 self.assertEquals(txt.read(), "".join(expected))
1476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 def test_newlines_output(self):
1478 testdict = {
1479 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1480 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1481 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1482 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1483 }
1484 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1485 for newline, expected in tests:
1486 buf = self.BytesIO()
1487 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1488 txt.write("AAA\nB")
1489 txt.write("BB\nCCC\n")
1490 txt.write("X\rY\r\nZ")
1491 txt.flush()
1492 self.assertEquals(buf.closed, False)
1493 self.assertEquals(buf.getvalue(), expected)
1494
1495 def test_destructor(self):
1496 l = []
1497 base = self.BytesIO
1498 class MyBytesIO(base):
1499 def close(self):
1500 l.append(self.getvalue())
1501 base.close(self)
1502 b = MyBytesIO()
1503 t = self.TextIOWrapper(b, encoding="ascii")
1504 t.write("abc")
1505 del t
1506 self.assertEquals([b"abc"], l)
1507
1508 def test_override_destructor(self):
1509 record = []
1510 class MyTextIO(self.TextIOWrapper):
1511 def __del__(self):
1512 record.append(1)
1513 try:
1514 f = super().__del__
1515 except AttributeError:
1516 pass
1517 else:
1518 f()
1519 def close(self):
1520 record.append(2)
1521 super().close()
1522 def flush(self):
1523 record.append(3)
1524 super().flush()
1525 b = self.BytesIO()
1526 t = MyTextIO(b, encoding="ascii")
1527 del t
1528 self.assertEqual(record, [1, 2, 3])
1529
1530 def test_error_through_destructor(self):
1531 # Test that the exception state is not modified by a destructor,
1532 # even if close() fails.
1533 rawio = self.CloseFailureIO()
1534 def f():
1535 self.TextIOWrapper(rawio).xyzzy
1536 with support.captured_output("stderr") as s:
1537 self.assertRaises(AttributeError, f)
1538 s = s.getvalue().strip()
1539 if s:
1540 # The destructor *may* have printed an unraisable error, check it
1541 self.assertEqual(len(s.splitlines()), 1)
1542 self.assert_(s.startswith("Exception IOError: "), s)
1543 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001544
Guido van Rossum9b76da62007-04-11 01:09:03 +00001545 # Systematic tests of the text I/O API
1546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001548 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1549 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001551 f._CHUNK_SIZE = chunksize
1552 self.assertEquals(f.write("abc"), 3)
1553 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001555 f._CHUNK_SIZE = chunksize
1556 self.assertEquals(f.tell(), 0)
1557 self.assertEquals(f.read(), "abc")
1558 cookie = f.tell()
1559 self.assertEquals(f.seek(0), 0)
1560 self.assertEquals(f.read(2), "ab")
1561 self.assertEquals(f.read(1), "c")
1562 self.assertEquals(f.read(1), "")
1563 self.assertEquals(f.read(), "")
1564 self.assertEquals(f.tell(), cookie)
1565 self.assertEquals(f.seek(0), 0)
1566 self.assertEquals(f.seek(0, 2), cookie)
1567 self.assertEquals(f.write("def"), 3)
1568 self.assertEquals(f.seek(cookie), cookie)
1569 self.assertEquals(f.read(), "def")
1570 if enc.startswith("utf"):
1571 self.multi_line_test(f, enc)
1572 f.close()
1573
1574 def multi_line_test(self, f, enc):
1575 f.seek(0)
1576 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001577 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001578 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001579 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001580 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001581 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001582 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001583 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001584 wlines.append((f.tell(), line))
1585 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001586 f.seek(0)
1587 rlines = []
1588 while True:
1589 pos = f.tell()
1590 line = f.readline()
1591 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001592 break
1593 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001594 self.assertEquals(rlines, wlines)
1595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 def test_telling(self):
1597 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001598 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001599 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001600 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001601 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001602 p2 = f.tell()
1603 f.seek(0)
1604 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001605 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001606 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001607 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001608 self.assertEquals(f.tell(), p2)
1609 f.seek(0)
1610 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001611 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001612 self.assertRaises(IOError, f.tell)
1613 self.assertEquals(f.tell(), p2)
1614 f.close()
1615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001616 def test_seeking(self):
1617 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001618 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001619 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001620 prefix = bytes(u_prefix.encode("utf-8"))
1621 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001622 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001623 suffix = bytes(u_suffix.encode("utf-8"))
1624 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001626 f.write(line*2)
1627 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001628 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001629 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001630 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001631 self.assertEquals(f.tell(), prefix_size)
1632 self.assertEquals(f.readline(), u_suffix)
1633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001635 # Regression test for a specific bug
1636 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001638 f.write(data)
1639 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001641 f._CHUNK_SIZE # Just test that it exists
1642 f._CHUNK_SIZE = 2
1643 f.readline()
1644 f.tell()
1645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 def test_seek_and_tell(self):
1647 #Test seek/tell using the StatefulIncrementalDecoder.
1648 # Make test faster by doing smaller seeks
1649 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001650
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001651 def testSeekAndTellWithData(data, min_pos=0):
1652 """Tell/seek to various points within a data stream and ensure
1653 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001655 f.write(data)
1656 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 f = self.open(support.TESTFN, encoding='test_decoder')
1658 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001659 decoded = f.read()
1660 f.close()
1661
Neal Norwitze2b07052008-03-18 19:52:05 +00001662 for i in range(min_pos, len(decoded) + 1): # seek positions
1663 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001665 self.assertEquals(f.read(i), decoded[:i])
1666 cookie = f.tell()
1667 self.assertEquals(f.read(j), decoded[i:i + j])
1668 f.seek(cookie)
1669 self.assertEquals(f.read(), decoded[i:])
1670 f.close()
1671
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001672 # Enable the test decoder.
1673 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001674
1675 # Run the tests.
1676 try:
1677 # Try each test case.
1678 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1679 testSeekAndTellWithData(input)
1680
1681 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001682 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1683 offset = CHUNK_SIZE - len(input)//2
1684 prefix = b'.'*offset
1685 # Don't bother seeking into the prefix (takes too long).
1686 min_pos = offset*2
1687 testSeekAndTellWithData(prefix + input, min_pos)
1688
1689 # Ensure our test decoder won't interfere with subsequent tests.
1690 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001691 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001694 data = "1234567890"
1695 tests = ("utf-16",
1696 "utf-16-le",
1697 "utf-16-be",
1698 "utf-32",
1699 "utf-32-le",
1700 "utf-32-be")
1701 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001702 buf = self.BytesIO()
1703 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001704 # Check if the BOM is written only once (see issue1753).
1705 f.write(data)
1706 f.write(data)
1707 f.seek(0)
1708 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001709 f.seek(0)
1710 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001711 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1712
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 def test_read_one_by_one(self):
1714 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001715 reads = ""
1716 while True:
1717 c = txt.read(1)
1718 if not c:
1719 break
1720 reads += c
1721 self.assertEquals(reads, "AA\nBB")
1722
1723 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001725 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001727 reads = ""
1728 while True:
1729 c = txt.read(128)
1730 if not c:
1731 break
1732 reads += c
1733 self.assertEquals(reads, "A"*127+"\nB")
1734
1735 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001737
1738 # read one char at a time
1739 reads = ""
1740 while True:
1741 c = txt.read(1)
1742 if not c:
1743 break
1744 reads += c
1745 self.assertEquals(reads, self.normalized)
1746
1747 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001749 txt._CHUNK_SIZE = 4
1750
1751 reads = ""
1752 while True:
1753 c = txt.read(4)
1754 if not c:
1755 break
1756 reads += c
1757 self.assertEquals(reads, self.normalized)
1758
1759 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001761 txt._CHUNK_SIZE = 4
1762
1763 reads = txt.read(4)
1764 reads += txt.read(4)
1765 reads += txt.readline()
1766 reads += txt.readline()
1767 reads += txt.readline()
1768 self.assertEquals(reads, self.normalized)
1769
1770 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001772 txt._CHUNK_SIZE = 4
1773
1774 reads = txt.read(4)
1775 reads += txt.read()
1776 self.assertEquals(reads, self.normalized)
1777
1778 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001780 txt._CHUNK_SIZE = 4
1781
1782 reads = txt.read(4)
1783 pos = txt.tell()
1784 txt.seek(0)
1785 txt.seek(pos)
1786 self.assertEquals(txt.read(4), "BBB\n")
1787
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001788 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001789 buffer = self.BytesIO(self.testdata)
1790 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001791
1792 self.assertEqual(buffer.seekable(), txt.seekable())
1793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794class CTextIOWrapperTest(TextIOWrapperTest):
1795
1796 def test_initialization(self):
1797 r = self.BytesIO(b"\xc3\xa9\n\n")
1798 b = self.BufferedReader(r, 1000)
1799 t = self.TextIOWrapper(b)
1800 self.assertRaises(TypeError, t.__init__, b, newline=42)
1801 self.assertRaises(ValueError, t.read)
1802 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1803 self.assertRaises(ValueError, t.read)
1804
1805 def test_garbage_collection(self):
1806 # C TextIOWrapper objects are collected, and collecting them flushes
1807 # all data to disk.
1808 # The Python version has __del__, so it ends in gc.garbage instead.
1809 rawio = io.FileIO(support.TESTFN, "wb")
1810 b = self.BufferedWriter(rawio)
1811 t = self.TextIOWrapper(b, encoding="ascii")
1812 t.write("456def")
1813 t.x = t
1814 wr = weakref.ref(t)
1815 del t
1816 gc.collect()
1817 self.assert_(wr() is None, wr)
1818 with open(support.TESTFN, "rb") as f:
1819 self.assertEqual(f.read(), b"456def")
1820
1821class PyTextIOWrapperTest(TextIOWrapperTest):
1822 pass
1823
1824
1825class IncrementalNewlineDecoderTest(unittest.TestCase):
1826
1827 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001828 # UTF-8 specific tests for a newline decoder
1829 def _check_decode(b, s, **kwargs):
1830 # We exercise getstate() / setstate() as well as decode()
1831 state = decoder.getstate()
1832 self.assertEquals(decoder.decode(b, **kwargs), s)
1833 decoder.setstate(state)
1834 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001835
Antoine Pitrou180a3362008-12-14 16:36:46 +00001836 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001837
Antoine Pitrou180a3362008-12-14 16:36:46 +00001838 _check_decode(b'\xe8', "")
1839 _check_decode(b'\xa2', "")
1840 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001841
Antoine Pitrou180a3362008-12-14 16:36:46 +00001842 _check_decode(b'\xe8', "")
1843 _check_decode(b'\xa2', "")
1844 _check_decode(b'\x88', "\u8888")
1845
1846 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001847 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1848
Antoine Pitrou180a3362008-12-14 16:36:46 +00001849 decoder.reset()
1850 _check_decode(b'\n', "\n")
1851 _check_decode(b'\r', "")
1852 _check_decode(b'', "\n", final=True)
1853 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001854
Antoine Pitrou180a3362008-12-14 16:36:46 +00001855 _check_decode(b'\r', "")
1856 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001857
Antoine Pitrou180a3362008-12-14 16:36:46 +00001858 _check_decode(b'\r\r\n', "\n\n")
1859 _check_decode(b'\r', "")
1860 _check_decode(b'\r', "\n")
1861 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001862
Antoine Pitrou180a3362008-12-14 16:36:46 +00001863 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1864 _check_decode(b'\xe8\xa2\x88', "\u8888")
1865 _check_decode(b'\n', "\n")
1866 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1867 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001870 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 if encoding is not None:
1872 encoder = codecs.getincrementalencoder(encoding)()
1873 def _decode_bytewise(s):
1874 # Decode one byte at a time
1875 for b in encoder.encode(s):
1876 result.append(decoder.decode(bytes([b])))
1877 else:
1878 encoder = None
1879 def _decode_bytewise(s):
1880 # Decode one char at a time
1881 for c in s:
1882 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00001883 self.assertEquals(decoder.newlines, None)
1884 _decode_bytewise("abc\n\r")
1885 self.assertEquals(decoder.newlines, '\n')
1886 _decode_bytewise("\nabc")
1887 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1888 _decode_bytewise("abc\r")
1889 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1890 _decode_bytewise("abc")
1891 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1892 _decode_bytewise("abc\r")
1893 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1894 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 input = "abc"
1896 if encoder is not None:
1897 encoder.reset()
1898 input = encoder.encode(input)
1899 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00001900 self.assertEquals(decoder.newlines, None)
1901
1902 def test_newline_decoder(self):
1903 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 # None meaning the IncrementalNewlineDecoder takes unicode input
1905 # rather than bytes input
1906 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00001907 'utf-16', 'utf-16-le', 'utf-16-be',
1908 'utf-32', 'utf-32-le', 'utf-32-be',
1909 )
1910 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 decoder = enc and codecs.getincrementaldecoder(enc)()
1912 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1913 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001914 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001915 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1916 self.check_newline_decoding_utf8(decoder)
1917
1918class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1919 pass
1920
1921class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1922 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00001923
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001924
Guido van Rossum01a27522007-03-07 01:00:12 +00001925# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001926
Guido van Rossum5abbf752007-08-27 17:39:33 +00001927class MiscIOTest(unittest.TestCase):
1928
Barry Warsaw40e82462008-11-20 20:14:50 +00001929 def tearDown(self):
1930 support.unlink(support.TESTFN)
1931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 def test___all__(self):
1933 for name in self.io.__all__:
1934 obj = getattr(self.io, name, None)
Guido van Rossum5abbf752007-08-27 17:39:33 +00001935 self.assert_(obj is not None, name)
1936 if name == "open":
1937 continue
1938 elif "error" in name.lower():
1939 self.assert_(issubclass(obj, Exception), name)
1940 else:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941 self.assert_(issubclass(obj, self.IOBase), name)
Benjamin Peterson65676e42008-11-05 21:42:45 +00001942
Barry Warsaw40e82462008-11-20 20:14:50 +00001943 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001945 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001946 f.close()
1947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00001949 self.assertEquals(f.name, support.TESTFN)
1950 self.assertEquals(f.buffer.name, support.TESTFN)
1951 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1952 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001953 self.assertEquals(f.buffer.mode, "rb")
1954 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001955 f.close()
1956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001958 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001959 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1960 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001963 self.assertEquals(g.mode, "wb")
1964 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001965 self.assertEquals(g.name, f.fileno())
1966 self.assertEquals(g.raw.name, f.fileno())
1967 f.close()
1968 g.close()
1969
Antoine Pitrou8043cf82009-01-09 19:54:29 +00001970 def test_io_after_close(self):
1971 for kwargs in [
1972 {"mode": "w"},
1973 {"mode": "wb"},
1974 {"mode": "w", "buffering": 1},
1975 {"mode": "w", "buffering": 2},
1976 {"mode": "wb", "buffering": 0},
1977 {"mode": "r"},
1978 {"mode": "rb"},
1979 {"mode": "r", "buffering": 1},
1980 {"mode": "r", "buffering": 2},
1981 {"mode": "rb", "buffering": 0},
1982 {"mode": "w+"},
1983 {"mode": "w+b"},
1984 {"mode": "w+", "buffering": 1},
1985 {"mode": "w+", "buffering": 2},
1986 {"mode": "w+b", "buffering": 0},
1987 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00001989 f.close()
1990 self.assertRaises(ValueError, f.flush)
1991 self.assertRaises(ValueError, f.fileno)
1992 self.assertRaises(ValueError, f.isatty)
1993 self.assertRaises(ValueError, f.__iter__)
1994 if hasattr(f, "peek"):
1995 self.assertRaises(ValueError, f.peek, 1)
1996 self.assertRaises(ValueError, f.read)
1997 if hasattr(f, "read1"):
1998 self.assertRaises(ValueError, f.read1, 1024)
1999 if hasattr(f, "readinto"):
2000 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2001 self.assertRaises(ValueError, f.readline)
2002 self.assertRaises(ValueError, f.readlines)
2003 self.assertRaises(ValueError, f.seek, 0)
2004 self.assertRaises(ValueError, f.tell)
2005 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 self.assertRaises(ValueError, f.write,
2007 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002008 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 def test_blockingioerror(self):
2012 # Various BlockingIOError issues
2013 self.assertRaises(TypeError, self.BlockingIOError)
2014 self.assertRaises(TypeError, self.BlockingIOError, 1)
2015 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2016 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2017 b = self.BlockingIOError(1, "")
2018 self.assertEqual(b.characters_written, 0)
2019 class C(str):
2020 pass
2021 c = C("")
2022 b = self.BlockingIOError(1, c)
2023 c.b = b
2024 b.c = c
2025 wr = weakref.ref(c)
2026 del c, b
2027 gc.collect()
2028 self.assert_(wr() is None, wr)
2029
2030 def test_abcs(self):
2031 # Test the visible base classes are ABCs.
2032 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2033 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2034 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2035 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2036
2037 def _check_abc_inheritance(self, abcmodule):
2038 with self.open(support.TESTFN, "wb", buffering=0) as f:
2039 self.assertTrue(isinstance(f, abcmodule.IOBase))
2040 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2041 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2042 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2043 with self.open(support.TESTFN, "wb") as f:
2044 self.assertTrue(isinstance(f, abcmodule.IOBase))
2045 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2046 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2047 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2048 with self.open(support.TESTFN, "w") as f:
2049 self.assertTrue(isinstance(f, abcmodule.IOBase))
2050 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2051 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2052 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2053
2054 def test_abc_inheritance(self):
2055 # Test implementations inherit from their respective ABCs
2056 self._check_abc_inheritance(self)
2057
2058 def test_abc_inheritance_official(self):
2059 # Test implementations inherit from the official ABCs of the
2060 # baseline "io" module.
2061 self._check_abc_inheritance(io)
2062
2063class CMiscIOTest(MiscIOTest):
2064 io = io
2065
2066class PyMiscIOTest(MiscIOTest):
2067 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002068
Guido van Rossum28524c72007-02-27 05:47:44 +00002069def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070 tests = (CIOTest, PyIOTest,
2071 CBufferedReaderTest, PyBufferedReaderTest,
2072 CBufferedWriterTest, PyBufferedWriterTest,
2073 CBufferedRWPairTest, PyBufferedRWPairTest,
2074 CBufferedRandomTest, PyBufferedRandomTest,
2075 StatefulIncrementalDecoderTest,
2076 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2077 CTextIOWrapperTest, PyTextIOWrapperTest,
2078 CMiscIOTest, PyMiscIOTest,)
2079
2080 # Put the namespaces of the IO module we are testing and some useful mock
2081 # classes in the __dict__ of each test.
2082 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2083 MockNonBlockWriterIO)
2084 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2085 c_io_ns = {name : getattr(io, name) for name in all_members}
2086 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2087 globs = globals()
2088 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2089 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2090 # Avoid turning open into a bound method.
2091 py_io_ns["open"] = pyio.OpenWrapper
2092 for test in tests:
2093 if test.__name__.startswith("C"):
2094 for name, obj in c_io_ns.items():
2095 setattr(test, name, obj)
2096 elif test.__name__.startswith("Py"):
2097 for name, obj in py_io_ns.items():
2098 setattr(test, name, obj)
2099
2100 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002101
2102if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 test_main()