blob: c9f7582ede73c91171307011eaecc99dea6c026e [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000039try:
40 import threading
41except ImportError:
42 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000043
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000045def _default_chunk_size():
46 """Get the default TextIOWrapper chunk size"""
47 with open(__file__, "r", encoding="latin1") as f:
48 return f._CHUNK_SIZE
49
50
51class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000053 def __init__(self, read_stack=()):
54 self._read_stack = list(read_stack)
55 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000057 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058
59 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063 except:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads += 1
Guido van Rossum78892e42007-04-06 17:31:18 +000065 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
113class CMockRawIO(MockRawIO, io.RawIOBase):
114 pass
115
116class PyMockRawIO(MockRawIO, pyio.RawIOBase):
117 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000118
Guido van Rossuma9e20242007-03-08 00:43:48 +0000119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120class MisbehavedRawIO(MockRawIO):
121 def write(self, b):
122 return super().write(b) * 2
123
124 def read(self, n=None):
125 return super().read(n) * 2
126
127 def seek(self, pos, whence):
128 return -123
129
130 def tell(self):
131 return -456
132
133 def readinto(self, buf):
134 super().readinto(buf)
135 return len(buf) * 5
136
137class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
138 pass
139
140class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
141 pass
142
143
144class CloseFailureIO(MockRawIO):
145 closed = 0
146
147 def close(self):
148 if not self.closed:
149 self.closed = 1
150 raise IOError
151
152class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
153 pass
154
155class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
156 pass
157
158
159class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000160
161 def __init__(self, data):
162 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000164
165 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000166 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000167 self.read_history.append(None if res is None else len(res))
168 return res
169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170 def readinto(self, b):
171 res = super().readinto(b)
172 self.read_history.append(res)
173 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000175class CMockFileIO(MockFileIO, io.BytesIO):
176 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178class PyMockFileIO(MockFileIO, pyio.BytesIO):
179 pass
180
181
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000182class MockUnseekableIO:
183 def seekable(self):
184 return False
185
186 def seek(self, *args):
187 raise self.UnsupportedOperation("not seekable")
188
189 def tell(self, *args):
190 raise self.UnsupportedOperation("not seekable")
191
192class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
193 UnsupportedOperation = io.UnsupportedOperation
194
195class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
196 UnsupportedOperation = pyio.UnsupportedOperation
197
198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000199class MockNonBlockWriterIO:
200
201 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000202 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205 def pop_written(self):
206 s = b"".join(self._write_stack)
207 self._write_stack[:] = []
208 return s
209
210 def block_on(self, char):
211 """Block when a given char is encountered."""
212 self._blocker_char = char
213
214 def readable(self):
215 return True
216
217 def seekable(self):
218 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000219
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 def writable(self):
221 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def write(self, b):
224 b = bytes(b)
225 n = -1
226 if self._blocker_char:
227 try:
228 n = b.index(self._blocker_char)
229 except ValueError:
230 pass
231 else:
232 self._blocker_char = None
233 self._write_stack.append(b[:n])
234 raise self.BlockingIOError(0, "test blocking", n)
235 self._write_stack.append(b)
236 return len(b)
237
238class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
239 BlockingIOError = io.BlockingIOError
240
241class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
242 BlockingIOError = pyio.BlockingIOError
243
Guido van Rossuma9e20242007-03-08 00:43:48 +0000244
Guido van Rossum28524c72007-02-27 05:47:44 +0000245class IOTest(unittest.TestCase):
246
Neal Norwitze7789b12008-03-24 06:18:09 +0000247 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000249
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000250 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000251 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000252
Guido van Rossum28524c72007-02-27 05:47:44 +0000253 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000254 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000255 f.truncate(0)
256 self.assertEqual(f.tell(), 5)
257 f.seek(0)
258
259 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000262 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000263 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000264 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000265 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000266 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000267 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000268 self.assertEqual(f.seek(-1, 2), 13)
269 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000270
Guido van Rossum87429772007-04-10 21:06:59 +0000271 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000272 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000273 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000274
Guido van Rossum9b76da62007-04-11 01:09:03 +0000275 def read_ops(self, f, buffered=False):
276 data = f.read(5)
277 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000279 self.assertEqual(f.readinto(data), 5)
280 self.assertEqual(data, b" worl")
281 self.assertEqual(f.readinto(data), 2)
282 self.assertEqual(len(data), 5)
283 self.assertEqual(data[:2], b"d\n")
284 self.assertEqual(f.seek(0), 0)
285 self.assertEqual(f.read(20), b"hello world\n")
286 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 self.assertEqual(f.seek(-6, 2), 6)
289 self.assertEqual(f.read(5), b"world")
290 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.seek(-6, 1), 5)
293 self.assertEqual(f.read(5), b" worl")
294 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 if buffered:
297 f.seek(0)
298 self.assertEqual(f.read(), b"hello world\n")
299 f.seek(6)
300 self.assertEqual(f.read(), b"world\n")
301 self.assertEqual(f.read(), b"")
302
Guido van Rossum34d69e52007-04-10 20:08:41 +0000303 LARGE = 2**31
304
Guido van Rossum53807da2007-04-10 19:01:47 +0000305 def large_file_ops(self, f):
306 assert f.readable()
307 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000308 self.assertEqual(f.seek(self.LARGE), self.LARGE)
309 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000310 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.tell(), self.LARGE + 3)
312 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000313 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000314 self.assertEqual(f.tell(), self.LARGE + 2)
315 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000316 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000317 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000318 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
319 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000320 self.assertEqual(f.read(2), b"x")
321
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 def test_invalid_operations(self):
323 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000324 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000326 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000327 self.assertRaises(exc, fp.read)
328 self.assertRaises(exc, fp.readline)
329 with self.open(support.TESTFN, "wb", buffering=0) as fp:
330 self.assertRaises(exc, fp.read)
331 self.assertRaises(exc, fp.readline)
332 with self.open(support.TESTFN, "rb", buffering=0) as fp:
333 self.assertRaises(exc, fp.write, b"blah")
334 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000335 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000336 self.assertRaises(exc, fp.write, b"blah")
337 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000338 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000339 self.assertRaises(exc, fp.write, "blah")
340 self.assertRaises(exc, fp.writelines, ["blah\n"])
341 # Non-zero seeking from current or end pos
342 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
343 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344
Guido van Rossum28524c72007-02-27 05:47:44 +0000345 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000356
Guido van Rossum87429772007-04-10 21:06:59 +0000357 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000368
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000369 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000378 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382
Guido van Rossum28524c72007-02-27 05:47:44 +0000383 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000384 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000389 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000390
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000396 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000397 print("\nTesting large file ops skipped on %s." % sys.platform,
398 file=sys.stderr)
399 print("It requires %d bytes and a long time." % self.LARGE,
400 file=sys.stderr)
401 print("Use 'regrtest.py -u largefile test_io' to run it.",
402 file=sys.stderr)
403 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 with self.open(support.TESTFN, "w+b", 0) as f:
405 self.large_file_ops(f)
406 with self.open(support.TESTFN, "w+b") as f:
407 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000408
409 def test_with_open(self):
410 for bufsize in (0, 1, 100):
411 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000412 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000413 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000414 self.assertEqual(f.closed, True)
415 f = None
416 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000418 1/0
419 except ZeroDivisionError:
420 self.assertEqual(f.closed, True)
421 else:
422 self.fail("1/0 didn't raise an exception")
423
Antoine Pitrou08838b62009-01-21 00:55:13 +0000424 # issue 5008
425 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000426 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000427 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000428 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000429 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000431 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000433 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000434
Guido van Rossum87429772007-04-10 21:06:59 +0000435 def test_destructor(self):
436 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000437 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000438 def __del__(self):
439 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000440 try:
441 f = super().__del__
442 except AttributeError:
443 pass
444 else:
445 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000446 def close(self):
447 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000448 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000449 def flush(self):
450 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 super().flush()
452 f = MyFileIO(support.TESTFN, "wb")
453 f.write(b"xxx")
454 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000455 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000457 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000458 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459
460 def _check_base_destructor(self, base):
461 record = []
462 class MyIO(base):
463 def __init__(self):
464 # This exercises the availability of attributes on object
465 # destruction.
466 # (in the C version, close() is called by the tp_dealloc
467 # function, not by __del__)
468 self.on_del = 1
469 self.on_close = 2
470 self.on_flush = 3
471 def __del__(self):
472 record.append(self.on_del)
473 try:
474 f = super().__del__
475 except AttributeError:
476 pass
477 else:
478 f()
479 def close(self):
480 record.append(self.on_close)
481 super().close()
482 def flush(self):
483 record.append(self.on_flush)
484 super().flush()
485 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000486 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000487 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000488 self.assertEqual(record, [1, 2, 3])
489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 def test_IOBase_destructor(self):
491 self._check_base_destructor(self.IOBase)
492
493 def test_RawIOBase_destructor(self):
494 self._check_base_destructor(self.RawIOBase)
495
496 def test_BufferedIOBase_destructor(self):
497 self._check_base_destructor(self.BufferedIOBase)
498
499 def test_TextIOBase_destructor(self):
500 self._check_base_destructor(self.TextIOBase)
501
Guido van Rossum87429772007-04-10 21:06:59 +0000502 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000503 with self.open(support.TESTFN, "wb") as f:
504 f.write(b"xxx")
505 with self.open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000507
Guido van Rossumd4103952007-04-12 05:44:49 +0000508 def test_array_writes(self):
509 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000510 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000511 with self.open(support.TESTFN, "wb", 0) as f:
512 self.assertEqual(f.write(a), n)
513 with self.open(support.TESTFN, "wb") as f:
514 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000515
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000516 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000518 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 def test_read_closed(self):
521 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000522 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000523 with self.open(support.TESTFN, "r") as f:
524 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000525 self.assertEqual(file.read(), "egg\n")
526 file.seek(0)
527 file.close()
528 self.assertRaises(ValueError, file.read)
529
530 def test_no_closefd_with_filename(self):
531 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000533
534 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000538 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000540 self.assertEqual(file.buffer.raw.closefd, False)
541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542 def test_garbage_collection(self):
543 # FileIO objects are collected, and collecting them flushes
544 # all data to disk.
545 f = self.FileIO(support.TESTFN, "wb")
546 f.write(b"abcxxx")
547 f.f = f
548 wr = weakref.ref(f)
549 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000550 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000551 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000552 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000554
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000555 def test_unbounded_file(self):
556 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
557 zero = "/dev/zero"
558 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000559 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000560 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000561 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000562 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000563 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000564 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000566 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000567 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000568 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000569 self.assertRaises(OverflowError, f.read)
570
Antoine Pitrou6be88762010-05-03 16:48:20 +0000571 def test_flush_error_on_close(self):
572 f = self.open(support.TESTFN, "wb", buffering=0)
573 def bad_flush():
574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
577
578 def test_multi_close(self):
579 f = self.open(support.TESTFN, "wb", buffering=0)
580 f.close()
581 f.close()
582 f.close()
583 self.assertRaises(ValueError, f.flush)
584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000585class CIOTest(IOTest):
586 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class PyIOTest(IOTest):
589 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000590
Guido van Rossuma9e20242007-03-08 00:43:48 +0000591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592class CommonBufferedTests:
593 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
594
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000595 def test_detach(self):
596 raw = self.MockRawIO()
597 buf = self.tp(raw)
598 self.assertIs(buf.detach(), raw)
599 self.assertRaises(ValueError, buf.detach)
600
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000601 def test_fileno(self):
602 rawio = self.MockRawIO()
603 bufio = self.tp(rawio)
604
605 self.assertEquals(42, bufio.fileno())
606
607 def test_no_fileno(self):
608 # XXX will we always have fileno() function? If so, kill
609 # this test. Else, write it.
610 pass
611
612 def test_invalid_args(self):
613 rawio = self.MockRawIO()
614 bufio = self.tp(rawio)
615 # Invalid whence
616 self.assertRaises(ValueError, bufio.seek, 0, -1)
617 self.assertRaises(ValueError, bufio.seek, 0, 3)
618
619 def test_override_destructor(self):
620 tp = self.tp
621 record = []
622 class MyBufferedIO(tp):
623 def __del__(self):
624 record.append(1)
625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
631 def close(self):
632 record.append(2)
633 super().close()
634 def flush(self):
635 record.append(3)
636 super().flush()
637 rawio = self.MockRawIO()
638 bufio = MyBufferedIO(rawio)
639 writable = bufio.writable()
640 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000641 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 if writable:
643 self.assertEqual(record, [1, 2, 3])
644 else:
645 self.assertEqual(record, [1, 2])
646
647 def test_context_manager(self):
648 # Test usability as a context manager
649 rawio = self.MockRawIO()
650 bufio = self.tp(rawio)
651 def _with():
652 with bufio:
653 pass
654 _with()
655 # bufio should now be closed, and using it a second time should raise
656 # a ValueError.
657 self.assertRaises(ValueError, _with)
658
659 def test_error_through_destructor(self):
660 # Test that the exception state is not modified by a destructor,
661 # even if close() fails.
662 rawio = self.CloseFailureIO()
663 def f():
664 self.tp(rawio).xyzzy
665 with support.captured_output("stderr") as s:
666 self.assertRaises(AttributeError, f)
667 s = s.getvalue().strip()
668 if s:
669 # The destructor *may* have printed an unraisable error, check it
670 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000671 self.assertTrue(s.startswith("Exception IOError: "), s)
672 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000673
Antoine Pitrou716c4442009-05-23 19:04:03 +0000674 def test_repr(self):
675 raw = self.MockRawIO()
676 b = self.tp(raw)
677 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
678 self.assertEqual(repr(b), "<%s>" % clsname)
679 raw.name = "dummy"
680 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
681 raw.name = b"dummy"
682 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
683
Antoine Pitrou6be88762010-05-03 16:48:20 +0000684 def test_flush_error_on_close(self):
685 raw = self.MockRawIO()
686 def bad_flush():
687 raise IOError()
688 raw.flush = bad_flush
689 b = self.tp(raw)
690 self.assertRaises(IOError, b.close) # exception not swallowed
691
692 def test_multi_close(self):
693 raw = self.MockRawIO()
694 b = self.tp(raw)
695 b.close()
696 b.close()
697 b.close()
698 self.assertRaises(ValueError, b.flush)
699
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000700 def test_unseekable(self):
701 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
702 self.assertRaises(self.UnsupportedOperation, bufio.tell)
703 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
704
Guido van Rossum78892e42007-04-06 17:31:18 +0000705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
707 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 def test_constructor(self):
710 rawio = self.MockRawIO([b"abc"])
711 bufio = self.tp(rawio)
712 bufio.__init__(rawio)
713 bufio.__init__(rawio, buffer_size=1024)
714 bufio.__init__(rawio, buffer_size=16)
715 self.assertEquals(b"abc", bufio.read())
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
717 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
718 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
719 rawio = self.MockRawIO([b"abc"])
720 bufio.__init__(rawio)
721 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000724 for arg in (None, 7):
725 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
726 bufio = self.tp(rawio)
727 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 # Invalid args
729 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_read1(self):
732 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
733 bufio = self.tp(rawio)
734 self.assertEquals(b"a", bufio.read(1))
735 self.assertEquals(b"b", bufio.read1(1))
736 self.assertEquals(rawio._reads, 1)
737 self.assertEquals(b"c", bufio.read1(100))
738 self.assertEquals(rawio._reads, 1)
739 self.assertEquals(b"d", bufio.read1(100))
740 self.assertEquals(rawio._reads, 2)
741 self.assertEquals(b"efg", bufio.read1(100))
742 self.assertEquals(rawio._reads, 3)
743 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000744 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 # Invalid args
746 self.assertRaises(ValueError, bufio.read1, -1)
747
748 def test_readinto(self):
749 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
751 b = bytearray(2)
752 self.assertEquals(bufio.readinto(b), 2)
753 self.assertEquals(b, b"ab")
754 self.assertEquals(bufio.readinto(b), 2)
755 self.assertEquals(b, b"cd")
756 self.assertEquals(bufio.readinto(b), 2)
757 self.assertEquals(b, b"ef")
758 self.assertEquals(bufio.readinto(b), 1)
759 self.assertEquals(b, b"gf")
760 self.assertEquals(bufio.readinto(b), 0)
761 self.assertEquals(b, b"gf")
762
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000763 def test_readlines(self):
764 def bufio():
765 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
766 return self.tp(rawio)
767 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
768 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
769 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000772 data = b"abcdefghi"
773 dlen = len(data)
774
775 tests = [
776 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
777 [ 100, [ 3, 3, 3], [ dlen ] ],
778 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
779 ]
780
781 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782 rawio = self.MockFileIO(data)
783 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000784 pos = 0
785 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000786 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000787 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000788 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000789 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000792 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000793 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
794 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000795
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000796 self.assertEquals(b"abcd", bufio.read(6))
797 self.assertEquals(b"e", bufio.read(1))
798 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000800 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000801 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803 def test_read_past_eof(self):
804 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
805 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000806
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000807 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000809 def test_read_all(self):
810 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000812
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000813 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000814
Victor Stinner45df8202010-04-28 22:31:17 +0000815 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000817 try:
818 # Write out many bytes with exactly the same number of 0's,
819 # 1's... 255's. This will help us check that concurrent reading
820 # doesn't duplicate or forget contents.
821 N = 1000
822 l = list(range(256)) * N
823 random.shuffle(l)
824 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000825 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000826 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000827 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000829 errors = []
830 results = []
831 def f():
832 try:
833 # Intra-buffer read then buffer-flushing read
834 for n in cycle([1, 19]):
835 s = bufio.read(n)
836 if not s:
837 break
838 # list.append() is atomic
839 results.append(s)
840 except Exception as e:
841 errors.append(e)
842 raise
843 threads = [threading.Thread(target=f) for x in range(20)]
844 for t in threads:
845 t.start()
846 time.sleep(0.02) # yield
847 for t in threads:
848 t.join()
849 self.assertFalse(errors,
850 "the following exceptions were caught: %r" % errors)
851 s = b''.join(results)
852 for i in range(256):
853 c = bytes(bytearray([i]))
854 self.assertEqual(s.count(c), N)
855 finally:
856 support.unlink(support.TESTFN)
857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_misbehaved_io(self):
859 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
860 bufio = self.tp(rawio)
861 self.assertRaises(IOError, bufio.seek, 0)
862 self.assertRaises(IOError, bufio.tell)
863
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000864 def test_no_extraneous_read(self):
865 # Issue #9550; when the raw IO object has satisfied the read request,
866 # we should not issue any additional reads, otherwise it may block
867 # (e.g. socket).
868 bufsize = 16
869 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
870 rawio = self.MockRawIO([b"x" * n])
871 bufio = self.tp(rawio, bufsize)
872 self.assertEqual(bufio.read(n), b"x" * n)
873 # Simple case: one raw read is enough to satisfy the request.
874 self.assertEqual(rawio._extraneous_reads, 0,
875 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
876 # A more complex case where two raw reads are needed to satisfy
877 # the request.
878 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
879 bufio = self.tp(rawio, bufsize)
880 self.assertEqual(bufio.read(n), b"x" * n)
881 self.assertEqual(rawio._extraneous_reads, 0,
882 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
883
884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885class CBufferedReaderTest(BufferedReaderTest):
886 tp = io.BufferedReader
887
888 def test_constructor(self):
889 BufferedReaderTest.test_constructor(self)
890 # The allocation can succeed on 32-bit builds, e.g. with more
891 # than 2GB RAM and a 64-bit kernel.
892 if sys.maxsize > 0x7FFFFFFF:
893 rawio = self.MockRawIO()
894 bufio = self.tp(rawio)
895 self.assertRaises((OverflowError, MemoryError, ValueError),
896 bufio.__init__, rawio, sys.maxsize)
897
898 def test_initialization(self):
899 rawio = self.MockRawIO([b"abc"])
900 bufio = self.tp(rawio)
901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
902 self.assertRaises(ValueError, bufio.read)
903 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
904 self.assertRaises(ValueError, bufio.read)
905 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
906 self.assertRaises(ValueError, bufio.read)
907
908 def test_misbehaved_io_read(self):
909 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
910 bufio = self.tp(rawio)
911 # _pyio.BufferedReader seems to implement reading different, so that
912 # checking this is not so easy.
913 self.assertRaises(IOError, bufio.read, 10)
914
915 def test_garbage_collection(self):
916 # C BufferedReader objects are collected.
917 # The Python version has __del__, so it ends into gc.garbage instead
918 rawio = self.FileIO(support.TESTFN, "w+b")
919 f = self.tp(rawio)
920 f.f = f
921 wr = weakref.ref(f)
922 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000923 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000924 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925
926class PyBufferedReaderTest(BufferedReaderTest):
927 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000928
Guido van Rossuma9e20242007-03-08 00:43:48 +0000929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
931 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_constructor(self):
934 rawio = self.MockRawIO()
935 bufio = self.tp(rawio)
936 bufio.__init__(rawio)
937 bufio.__init__(rawio, buffer_size=1024)
938 bufio.__init__(rawio, buffer_size=16)
939 self.assertEquals(3, bufio.write(b"abc"))
940 bufio.flush()
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
942 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
943 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
944 bufio.__init__(rawio)
945 self.assertEquals(3, bufio.write(b"ghi"))
946 bufio.flush()
947 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
948
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000949 def test_detach_flush(self):
950 raw = self.MockRawIO()
951 buf = self.tp(raw)
952 buf.write(b"howdy!")
953 self.assertFalse(raw._write_stack)
954 buf.detach()
955 self.assertEqual(raw._write_stack, [b"howdy!"])
956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000958 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 writer = self.MockRawIO()
960 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000961 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000962 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 def test_write_overflow(self):
965 writer = self.MockRawIO()
966 bufio = self.tp(writer, 8)
967 contents = b"abcdefghijklmnop"
968 for n in range(0, len(contents), 3):
969 bufio.write(contents[n:n+3])
970 flushed = b"".join(writer._write_stack)
971 # At least (total - 8) bytes were implicitly flushed, perhaps more
972 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000973 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def check_writes(self, intermediate_func):
976 # Lots of writes, test the flushed output is as expected.
977 contents = bytes(range(256)) * 1000
978 n = 0
979 writer = self.MockRawIO()
980 bufio = self.tp(writer, 13)
981 # Generator of write sizes: repeat each N 15 times then proceed to N+1
982 def gen_sizes():
983 for size in count(1):
984 for i in range(15):
985 yield size
986 sizes = gen_sizes()
987 while n < len(contents):
988 size = min(next(sizes), len(contents) - n)
989 self.assertEquals(bufio.write(contents[n:n+size]), size)
990 intermediate_func(bufio)
991 n += size
992 bufio.flush()
993 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 def test_writes(self):
996 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998 def test_writes_and_flushes(self):
999 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 def test_writes_and_seeks(self):
1002 def _seekabs(bufio):
1003 pos = bufio.tell()
1004 bufio.seek(pos + 1, 0)
1005 bufio.seek(pos - 1, 0)
1006 bufio.seek(pos, 0)
1007 self.check_writes(_seekabs)
1008 def _seekrel(bufio):
1009 pos = bufio.seek(0, 1)
1010 bufio.seek(+1, 1)
1011 bufio.seek(-1, 1)
1012 bufio.seek(pos, 0)
1013 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 def test_writes_and_truncates(self):
1016 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 def test_write_non_blocking(self):
1019 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001020 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 self.assertEquals(bufio.write(b"abcd"), 4)
1023 self.assertEquals(bufio.write(b"efghi"), 5)
1024 # 1 byte will be written, the rest will be buffered
1025 raw.block_on(b"k")
1026 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1029 raw.block_on(b"0")
1030 try:
1031 bufio.write(b"opqrwxyz0123456789")
1032 except self.BlockingIOError as e:
1033 written = e.characters_written
1034 else:
1035 self.fail("BlockingIOError should have been raised")
1036 self.assertEquals(written, 16)
1037 self.assertEquals(raw.pop_written(),
1038 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1041 s = raw.pop_written()
1042 # Previously buffered bytes were flushed
1043 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_write_and_rewind(self):
1046 raw = io.BytesIO()
1047 bufio = self.tp(raw, 4)
1048 self.assertEqual(bufio.write(b"abcdef"), 6)
1049 self.assertEqual(bufio.tell(), 6)
1050 bufio.seek(0, 0)
1051 self.assertEqual(bufio.write(b"XY"), 2)
1052 bufio.seek(6, 0)
1053 self.assertEqual(raw.getvalue(), b"XYcdef")
1054 self.assertEqual(bufio.write(b"123456"), 6)
1055 bufio.flush()
1056 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_flush(self):
1059 writer = self.MockRawIO()
1060 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001061 bufio.write(b"abc")
1062 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001063 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_destructor(self):
1066 writer = self.MockRawIO()
1067 bufio = self.tp(writer, 8)
1068 bufio.write(b"abc")
1069 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001070 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 self.assertEquals(b"abc", writer._write_stack[0])
1072
1073 def test_truncate(self):
1074 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001075 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 bufio = self.tp(raw, 8)
1077 bufio.write(b"abcdef")
1078 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001079 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001080 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 self.assertEqual(f.read(), b"abc")
1082
Victor Stinner45df8202010-04-28 22:31:17 +00001083 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001085 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086 # Write out many bytes from many threads and test they were
1087 # all flushed.
1088 N = 1000
1089 contents = bytes(range(256)) * N
1090 sizes = cycle([1, 19])
1091 n = 0
1092 queue = deque()
1093 while n < len(contents):
1094 size = next(sizes)
1095 queue.append(contents[n:n+size])
1096 n += size
1097 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001098 # We use a real file object because it allows us to
1099 # exercise situations where the GIL is released before
1100 # writing the buffer to the raw streams. This is in addition
1101 # to concurrency issues due to switching threads in the middle
1102 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001103 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001104 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001105 errors = []
1106 def f():
1107 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 while True:
1109 try:
1110 s = queue.popleft()
1111 except IndexError:
1112 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001113 bufio.write(s)
1114 except Exception as e:
1115 errors.append(e)
1116 raise
1117 threads = [threading.Thread(target=f) for x in range(20)]
1118 for t in threads:
1119 t.start()
1120 time.sleep(0.02) # yield
1121 for t in threads:
1122 t.join()
1123 self.assertFalse(errors,
1124 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001126 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 s = f.read()
1128 for i in range(256):
1129 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001130 finally:
1131 support.unlink(support.TESTFN)
1132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 def test_misbehaved_io(self):
1134 rawio = self.MisbehavedRawIO()
1135 bufio = self.tp(rawio, 5)
1136 self.assertRaises(IOError, bufio.seek, 0)
1137 self.assertRaises(IOError, bufio.tell)
1138 self.assertRaises(IOError, bufio.write, b"abcdef")
1139
Benjamin Peterson59406a92009-03-26 17:10:29 +00001140 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001141 with support.check_warnings(("max_buffer_size is deprecated",
1142 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001143 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001144
1145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146class CBufferedWriterTest(BufferedWriterTest):
1147 tp = io.BufferedWriter
1148
1149 def test_constructor(self):
1150 BufferedWriterTest.test_constructor(self)
1151 # The allocation can succeed on 32-bit builds, e.g. with more
1152 # than 2GB RAM and a 64-bit kernel.
1153 if sys.maxsize > 0x7FFFFFFF:
1154 rawio = self.MockRawIO()
1155 bufio = self.tp(rawio)
1156 self.assertRaises((OverflowError, MemoryError, ValueError),
1157 bufio.__init__, rawio, sys.maxsize)
1158
1159 def test_initialization(self):
1160 rawio = self.MockRawIO()
1161 bufio = self.tp(rawio)
1162 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1163 self.assertRaises(ValueError, bufio.write, b"def")
1164 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1165 self.assertRaises(ValueError, bufio.write, b"def")
1166 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1167 self.assertRaises(ValueError, bufio.write, b"def")
1168
1169 def test_garbage_collection(self):
1170 # C BufferedWriter objects are collected, and collecting them flushes
1171 # all data to disk.
1172 # The Python version has __del__, so it ends into gc.garbage instead
1173 rawio = self.FileIO(support.TESTFN, "w+b")
1174 f = self.tp(rawio)
1175 f.write(b"123xxx")
1176 f.x = f
1177 wr = weakref.ref(f)
1178 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001179 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001180 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001181 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 self.assertEqual(f.read(), b"123xxx")
1183
1184
1185class PyBufferedWriterTest(BufferedWriterTest):
1186 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001187
Guido van Rossum01a27522007-03-07 01:00:12 +00001188class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001189
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001190 def test_constructor(self):
1191 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001192 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001193
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001194 def test_detach(self):
1195 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1196 self.assertRaises(self.UnsupportedOperation, pair.detach)
1197
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001198 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001199 with support.check_warnings(("max_buffer_size is deprecated",
1200 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001201 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001202
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001203 def test_constructor_with_not_readable(self):
1204 class NotReadable(MockRawIO):
1205 def readable(self):
1206 return False
1207
1208 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1209
1210 def test_constructor_with_not_writeable(self):
1211 class NotWriteable(MockRawIO):
1212 def writable(self):
1213 return False
1214
1215 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1216
1217 def test_read(self):
1218 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1219
1220 self.assertEqual(pair.read(3), b"abc")
1221 self.assertEqual(pair.read(1), b"d")
1222 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001223 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1224 self.assertEqual(pair.read(None), b"abc")
1225
1226 def test_readlines(self):
1227 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1228 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1229 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1230 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001231
1232 def test_read1(self):
1233 # .read1() is delegated to the underlying reader object, so this test
1234 # can be shallow.
1235 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1236
1237 self.assertEqual(pair.read1(3), b"abc")
1238
1239 def test_readinto(self):
1240 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1241
1242 data = bytearray(5)
1243 self.assertEqual(pair.readinto(data), 5)
1244 self.assertEqual(data, b"abcde")
1245
1246 def test_write(self):
1247 w = self.MockRawIO()
1248 pair = self.tp(self.MockRawIO(), w)
1249
1250 pair.write(b"abc")
1251 pair.flush()
1252 pair.write(b"def")
1253 pair.flush()
1254 self.assertEqual(w._write_stack, [b"abc", b"def"])
1255
1256 def test_peek(self):
1257 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1258
1259 self.assertTrue(pair.peek(3).startswith(b"abc"))
1260 self.assertEqual(pair.read(3), b"abc")
1261
1262 def test_readable(self):
1263 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1264 self.assertTrue(pair.readable())
1265
1266 def test_writeable(self):
1267 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1268 self.assertTrue(pair.writable())
1269
1270 def test_seekable(self):
1271 # BufferedRWPairs are never seekable, even if their readers and writers
1272 # are.
1273 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1274 self.assertFalse(pair.seekable())
1275
1276 # .flush() is delegated to the underlying writer object and has been
1277 # tested in the test_write method.
1278
1279 def test_close_and_closed(self):
1280 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1281 self.assertFalse(pair.closed)
1282 pair.close()
1283 self.assertTrue(pair.closed)
1284
1285 def test_isatty(self):
1286 class SelectableIsAtty(MockRawIO):
1287 def __init__(self, isatty):
1288 MockRawIO.__init__(self)
1289 self._isatty = isatty
1290
1291 def isatty(self):
1292 return self._isatty
1293
1294 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1295 self.assertFalse(pair.isatty())
1296
1297 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1298 self.assertTrue(pair.isatty())
1299
1300 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1301 self.assertTrue(pair.isatty())
1302
1303 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1304 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001305
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306class CBufferedRWPairTest(BufferedRWPairTest):
1307 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001308
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309class PyBufferedRWPairTest(BufferedRWPairTest):
1310 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001311
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312
1313class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1314 read_mode = "rb+"
1315 write_mode = "wb+"
1316
1317 def test_constructor(self):
1318 BufferedReaderTest.test_constructor(self)
1319 BufferedWriterTest.test_constructor(self)
1320
1321 def test_read_and_write(self):
1322 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001323 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001324
1325 self.assertEqual(b"as", rw.read(2))
1326 rw.write(b"ddd")
1327 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001328 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001330 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001331
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001332 def test_seek_and_tell(self):
1333 raw = self.BytesIO(b"asdfghjkl")
1334 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001335
1336 self.assertEquals(b"as", rw.read(2))
1337 self.assertEquals(2, rw.tell())
1338 rw.seek(0, 0)
1339 self.assertEquals(b"asdf", rw.read(4))
1340
1341 rw.write(b"asdf")
1342 rw.seek(0, 0)
1343 self.assertEquals(b"asdfasdfl", rw.read())
1344 self.assertEquals(9, rw.tell())
1345 rw.seek(-4, 2)
1346 self.assertEquals(5, rw.tell())
1347 rw.seek(2, 1)
1348 self.assertEquals(7, rw.tell())
1349 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001350 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001352 def check_flush_and_read(self, read_func):
1353 raw = self.BytesIO(b"abcdefghi")
1354 bufio = self.tp(raw)
1355
1356 self.assertEquals(b"ab", read_func(bufio, 2))
1357 bufio.write(b"12")
1358 self.assertEquals(b"ef", read_func(bufio, 2))
1359 self.assertEquals(6, bufio.tell())
1360 bufio.flush()
1361 self.assertEquals(6, bufio.tell())
1362 self.assertEquals(b"ghi", read_func(bufio))
1363 raw.seek(0, 0)
1364 raw.write(b"XYZ")
1365 # flush() resets the read buffer
1366 bufio.flush()
1367 bufio.seek(0, 0)
1368 self.assertEquals(b"XYZ", read_func(bufio, 3))
1369
1370 def test_flush_and_read(self):
1371 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1372
1373 def test_flush_and_readinto(self):
1374 def _readinto(bufio, n=-1):
1375 b = bytearray(n if n >= 0 else 9999)
1376 n = bufio.readinto(b)
1377 return bytes(b[:n])
1378 self.check_flush_and_read(_readinto)
1379
1380 def test_flush_and_peek(self):
1381 def _peek(bufio, n=-1):
1382 # This relies on the fact that the buffer can contain the whole
1383 # raw stream, otherwise peek() can return less.
1384 b = bufio.peek(n)
1385 if n != -1:
1386 b = b[:n]
1387 bufio.seek(len(b), 1)
1388 return b
1389 self.check_flush_and_read(_peek)
1390
1391 def test_flush_and_write(self):
1392 raw = self.BytesIO(b"abcdefghi")
1393 bufio = self.tp(raw)
1394
1395 bufio.write(b"123")
1396 bufio.flush()
1397 bufio.write(b"45")
1398 bufio.flush()
1399 bufio.seek(0, 0)
1400 self.assertEquals(b"12345fghi", raw.getvalue())
1401 self.assertEquals(b"12345fghi", bufio.read())
1402
1403 def test_threads(self):
1404 BufferedReaderTest.test_threads(self)
1405 BufferedWriterTest.test_threads(self)
1406
1407 def test_writes_and_peek(self):
1408 def _peek(bufio):
1409 bufio.peek(1)
1410 self.check_writes(_peek)
1411 def _peek(bufio):
1412 pos = bufio.tell()
1413 bufio.seek(-1, 1)
1414 bufio.peek(1)
1415 bufio.seek(pos, 0)
1416 self.check_writes(_peek)
1417
1418 def test_writes_and_reads(self):
1419 def _read(bufio):
1420 bufio.seek(-1, 1)
1421 bufio.read(1)
1422 self.check_writes(_read)
1423
1424 def test_writes_and_read1s(self):
1425 def _read1(bufio):
1426 bufio.seek(-1, 1)
1427 bufio.read1(1)
1428 self.check_writes(_read1)
1429
1430 def test_writes_and_readintos(self):
1431 def _read(bufio):
1432 bufio.seek(-1, 1)
1433 bufio.readinto(bytearray(1))
1434 self.check_writes(_read)
1435
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001436 def test_write_after_readahead(self):
1437 # Issue #6629: writing after the buffer was filled by readahead should
1438 # first rewind the raw stream.
1439 for overwrite_size in [1, 5]:
1440 raw = self.BytesIO(b"A" * 10)
1441 bufio = self.tp(raw, 4)
1442 # Trigger readahead
1443 self.assertEqual(bufio.read(1), b"A")
1444 self.assertEqual(bufio.tell(), 1)
1445 # Overwriting should rewind the raw stream if it needs so
1446 bufio.write(b"B" * overwrite_size)
1447 self.assertEqual(bufio.tell(), overwrite_size + 1)
1448 # If the write size was smaller than the buffer size, flush() and
1449 # check that rewind happens.
1450 bufio.flush()
1451 self.assertEqual(bufio.tell(), overwrite_size + 1)
1452 s = raw.getvalue()
1453 self.assertEqual(s,
1454 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1455
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001456 def test_truncate_after_read_or_write(self):
1457 raw = self.BytesIO(b"A" * 10)
1458 bufio = self.tp(raw, 100)
1459 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1460 self.assertEqual(bufio.truncate(), 2)
1461 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1462 self.assertEqual(bufio.truncate(), 4)
1463
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001464 def test_misbehaved_io(self):
1465 BufferedReaderTest.test_misbehaved_io(self)
1466 BufferedWriterTest.test_misbehaved_io(self)
1467
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001468 # You can't construct a BufferedRandom over a non-seekable stream.
1469 test_unseekable = None
1470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471class CBufferedRandomTest(BufferedRandomTest):
1472 tp = io.BufferedRandom
1473
1474 def test_constructor(self):
1475 BufferedRandomTest.test_constructor(self)
1476 # The allocation can succeed on 32-bit builds, e.g. with more
1477 # than 2GB RAM and a 64-bit kernel.
1478 if sys.maxsize > 0x7FFFFFFF:
1479 rawio = self.MockRawIO()
1480 bufio = self.tp(rawio)
1481 self.assertRaises((OverflowError, MemoryError, ValueError),
1482 bufio.__init__, rawio, sys.maxsize)
1483
1484 def test_garbage_collection(self):
1485 CBufferedReaderTest.test_garbage_collection(self)
1486 CBufferedWriterTest.test_garbage_collection(self)
1487
1488class PyBufferedRandomTest(BufferedRandomTest):
1489 tp = pyio.BufferedRandom
1490
1491
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001492# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1493# properties:
1494# - A single output character can correspond to many bytes of input.
1495# - The number of input bytes to complete the character can be
1496# undetermined until the last input byte is received.
1497# - The number of input bytes can vary depending on previous input.
1498# - A single input byte can correspond to many characters of output.
1499# - The number of output characters can be undetermined until the
1500# last input byte is received.
1501# - The number of output characters can vary depending on previous input.
1502
1503class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1504 """
1505 For testing seek/tell behavior with a stateful, buffering decoder.
1506
1507 Input is a sequence of words. Words may be fixed-length (length set
1508 by input) or variable-length (period-terminated). In variable-length
1509 mode, extra periods are ignored. Possible words are:
1510 - 'i' followed by a number sets the input length, I (maximum 99).
1511 When I is set to 0, words are space-terminated.
1512 - 'o' followed by a number sets the output length, O (maximum 99).
1513 - Any other word is converted into a word followed by a period on
1514 the output. The output word consists of the input word truncated
1515 or padded out with hyphens to make its length equal to O. If O
1516 is 0, the word is output verbatim without truncating or padding.
1517 I and O are initially set to 1. When I changes, any buffered input is
1518 re-scanned according to the new I. EOF also terminates the last word.
1519 """
1520
1521 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001522 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001523 self.reset()
1524
1525 def __repr__(self):
1526 return '<SID %x>' % id(self)
1527
1528 def reset(self):
1529 self.i = 1
1530 self.o = 1
1531 self.buffer = bytearray()
1532
1533 def getstate(self):
1534 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1535 return bytes(self.buffer), i*100 + o
1536
1537 def setstate(self, state):
1538 buffer, io = state
1539 self.buffer = bytearray(buffer)
1540 i, o = divmod(io, 100)
1541 self.i, self.o = i ^ 1, o ^ 1
1542
1543 def decode(self, input, final=False):
1544 output = ''
1545 for b in input:
1546 if self.i == 0: # variable-length, terminated with period
1547 if b == ord('.'):
1548 if self.buffer:
1549 output += self.process_word()
1550 else:
1551 self.buffer.append(b)
1552 else: # fixed-length, terminate after self.i bytes
1553 self.buffer.append(b)
1554 if len(self.buffer) == self.i:
1555 output += self.process_word()
1556 if final and self.buffer: # EOF terminates the last word
1557 output += self.process_word()
1558 return output
1559
1560 def process_word(self):
1561 output = ''
1562 if self.buffer[0] == ord('i'):
1563 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1564 elif self.buffer[0] == ord('o'):
1565 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1566 else:
1567 output = self.buffer.decode('ascii')
1568 if len(output) < self.o:
1569 output += '-'*self.o # pad out with hyphens
1570 if self.o:
1571 output = output[:self.o] # truncate to output length
1572 output += '.'
1573 self.buffer = bytearray()
1574 return output
1575
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001576 codecEnabled = False
1577
1578 @classmethod
1579 def lookupTestDecoder(cls, name):
1580 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001581 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001582 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001583 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001584 incrementalencoder=None,
1585 streamreader=None, streamwriter=None,
1586 incrementaldecoder=cls)
1587
1588# Register the previous decoder for testing.
1589# Disabled by default, tests will enable it.
1590codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1591
1592
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001593class StatefulIncrementalDecoderTest(unittest.TestCase):
1594 """
1595 Make sure the StatefulIncrementalDecoder actually works.
1596 """
1597
1598 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001599 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001600 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001601 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001602 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001603 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001604 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001605 # I=0, O=6 (variable-length input, fixed-length output)
1606 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1607 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001608 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001609 # I=6, O=3 (fixed-length input > fixed-length output)
1610 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1611 # I=0, then 3; O=29, then 15 (with longer output)
1612 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1613 'a----------------------------.' +
1614 'b----------------------------.' +
1615 'cde--------------------------.' +
1616 'abcdefghijabcde.' +
1617 'a.b------------.' +
1618 '.c.------------.' +
1619 'd.e------------.' +
1620 'k--------------.' +
1621 'l--------------.' +
1622 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001623 ]
1624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001626 # Try a few one-shot test cases.
1627 for input, eof, output in self.test_cases:
1628 d = StatefulIncrementalDecoder()
1629 self.assertEquals(d.decode(input, eof), output)
1630
1631 # Also test an unfinished decode, followed by forcing EOF.
1632 d = StatefulIncrementalDecoder()
1633 self.assertEquals(d.decode(b'oiabcd'), '')
1634 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001635
1636class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001637
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001638 def setUp(self):
1639 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1640 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001641 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001642
Guido van Rossumd0712812007-04-11 16:32:43 +00001643 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001644 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 def test_constructor(self):
1647 r = self.BytesIO(b"\xc3\xa9\n\n")
1648 b = self.BufferedReader(r, 1000)
1649 t = self.TextIOWrapper(b)
1650 t.__init__(b, encoding="latin1", newline="\r\n")
1651 self.assertEquals(t.encoding, "latin1")
1652 self.assertEquals(t.line_buffering, False)
1653 t.__init__(b, encoding="utf8", line_buffering=True)
1654 self.assertEquals(t.encoding, "utf8")
1655 self.assertEquals(t.line_buffering, True)
1656 self.assertEquals("\xe9\n", t.readline())
1657 self.assertRaises(TypeError, t.__init__, b, newline=42)
1658 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1659
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001660 def test_detach(self):
1661 r = self.BytesIO()
1662 b = self.BufferedWriter(r)
1663 t = self.TextIOWrapper(b)
1664 self.assertIs(t.detach(), b)
1665
1666 t = self.TextIOWrapper(b, encoding="ascii")
1667 t.write("howdy")
1668 self.assertFalse(r.getvalue())
1669 t.detach()
1670 self.assertEqual(r.getvalue(), b"howdy")
1671 self.assertRaises(ValueError, t.detach)
1672
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001673 def test_repr(self):
1674 raw = self.BytesIO("hello".encode("utf-8"))
1675 b = self.BufferedReader(raw)
1676 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001677 modname = self.TextIOWrapper.__module__
1678 self.assertEqual(repr(t),
1679 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1680 raw.name = "dummy"
1681 self.assertEqual(repr(t),
1682 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1683 raw.name = b"dummy"
1684 self.assertEqual(repr(t),
1685 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001687 def test_line_buffering(self):
1688 r = self.BytesIO()
1689 b = self.BufferedWriter(r, 1000)
1690 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001691 t.write("X")
1692 self.assertEquals(r.getvalue(), b"") # No flush happened
1693 t.write("Y\nZ")
1694 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1695 t.write("A\rB")
1696 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 def test_encoding(self):
1699 # Check the encoding attribute is always set, and valid
1700 b = self.BytesIO()
1701 t = self.TextIOWrapper(b, encoding="utf8")
1702 self.assertEqual(t.encoding, "utf8")
1703 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001704 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 codecs.lookup(t.encoding)
1706
1707 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001708 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 b = self.BytesIO(b"abc\n\xff\n")
1710 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001711 self.assertRaises(UnicodeError, t.read)
1712 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 b = self.BytesIO(b"abc\n\xff\n")
1714 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001715 self.assertRaises(UnicodeError, t.read)
1716 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 b = self.BytesIO(b"abc\n\xff\n")
1718 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001719 self.assertEquals(t.read(), "abc\n\n")
1720 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 b = self.BytesIO(b"abc\n\xff\n")
1722 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001723 self.assertEquals(t.read(), "abc\n\ufffd\n")
1724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001726 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 b = self.BytesIO()
1728 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001729 self.assertRaises(UnicodeError, t.write, "\xff")
1730 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 b = self.BytesIO()
1732 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001733 self.assertRaises(UnicodeError, t.write, "\xff")
1734 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001735 b = self.BytesIO()
1736 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001737 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001738 t.write("abc\xffdef\n")
1739 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001740 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001741 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742 b = self.BytesIO()
1743 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001744 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001745 t.write("abc\xffdef\n")
1746 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001747 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001750 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1751
1752 tests = [
1753 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001754 [ '', input_lines ],
1755 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1756 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1757 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001758 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001759 encodings = (
1760 'utf-8', 'latin-1',
1761 'utf-16', 'utf-16-le', 'utf-16-be',
1762 'utf-32', 'utf-32-le', 'utf-32-be',
1763 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001764
Guido van Rossum8358db22007-08-18 21:39:55 +00001765 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001766 # character in TextIOWrapper._pending_line.
1767 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001768 # XXX: str.encode() should return bytes
1769 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001770 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001771 for bufsize in range(1, 10):
1772 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1774 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001775 encoding=encoding)
1776 if do_reads:
1777 got_lines = []
1778 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001779 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001780 if c2 == '':
1781 break
1782 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001783 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001784 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001785 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001786
1787 for got_line, exp_line in zip(got_lines, exp_lines):
1788 self.assertEquals(got_line, exp_line)
1789 self.assertEquals(len(got_lines), len(exp_lines))
1790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 def test_newlines_input(self):
1792 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001793 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1794 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001795 (None, normalized.decode("ascii").splitlines(True)),
1796 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1798 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1799 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001800 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 buf = self.BytesIO(testdata)
1802 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001803 self.assertEquals(txt.readlines(), expected)
1804 txt.seek(0)
1805 self.assertEquals(txt.read(), "".join(expected))
1806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 def test_newlines_output(self):
1808 testdict = {
1809 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1810 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1811 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1812 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1813 }
1814 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1815 for newline, expected in tests:
1816 buf = self.BytesIO()
1817 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1818 txt.write("AAA\nB")
1819 txt.write("BB\nCCC\n")
1820 txt.write("X\rY\r\nZ")
1821 txt.flush()
1822 self.assertEquals(buf.closed, False)
1823 self.assertEquals(buf.getvalue(), expected)
1824
1825 def test_destructor(self):
1826 l = []
1827 base = self.BytesIO
1828 class MyBytesIO(base):
1829 def close(self):
1830 l.append(self.getvalue())
1831 base.close(self)
1832 b = MyBytesIO()
1833 t = self.TextIOWrapper(b, encoding="ascii")
1834 t.write("abc")
1835 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001836 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 self.assertEquals([b"abc"], l)
1838
1839 def test_override_destructor(self):
1840 record = []
1841 class MyTextIO(self.TextIOWrapper):
1842 def __del__(self):
1843 record.append(1)
1844 try:
1845 f = super().__del__
1846 except AttributeError:
1847 pass
1848 else:
1849 f()
1850 def close(self):
1851 record.append(2)
1852 super().close()
1853 def flush(self):
1854 record.append(3)
1855 super().flush()
1856 b = self.BytesIO()
1857 t = MyTextIO(b, encoding="ascii")
1858 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001859 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 self.assertEqual(record, [1, 2, 3])
1861
1862 def test_error_through_destructor(self):
1863 # Test that the exception state is not modified by a destructor,
1864 # even if close() fails.
1865 rawio = self.CloseFailureIO()
1866 def f():
1867 self.TextIOWrapper(rawio).xyzzy
1868 with support.captured_output("stderr") as s:
1869 self.assertRaises(AttributeError, f)
1870 s = s.getvalue().strip()
1871 if s:
1872 # The destructor *may* have printed an unraisable error, check it
1873 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001874 self.assertTrue(s.startswith("Exception IOError: "), s)
1875 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001876
Guido van Rossum9b76da62007-04-11 01:09:03 +00001877 # Systematic tests of the text I/O API
1878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001880 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1881 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883 f._CHUNK_SIZE = chunksize
1884 self.assertEquals(f.write("abc"), 3)
1885 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001887 f._CHUNK_SIZE = chunksize
1888 self.assertEquals(f.tell(), 0)
1889 self.assertEquals(f.read(), "abc")
1890 cookie = f.tell()
1891 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001892 self.assertEquals(f.read(None), "abc")
1893 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001894 self.assertEquals(f.read(2), "ab")
1895 self.assertEquals(f.read(1), "c")
1896 self.assertEquals(f.read(1), "")
1897 self.assertEquals(f.read(), "")
1898 self.assertEquals(f.tell(), cookie)
1899 self.assertEquals(f.seek(0), 0)
1900 self.assertEquals(f.seek(0, 2), cookie)
1901 self.assertEquals(f.write("def"), 3)
1902 self.assertEquals(f.seek(cookie), cookie)
1903 self.assertEquals(f.read(), "def")
1904 if enc.startswith("utf"):
1905 self.multi_line_test(f, enc)
1906 f.close()
1907
1908 def multi_line_test(self, f, enc):
1909 f.seek(0)
1910 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001911 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001912 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001913 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001915 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001916 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001917 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001918 wlines.append((f.tell(), line))
1919 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001920 f.seek(0)
1921 rlines = []
1922 while True:
1923 pos = f.tell()
1924 line = f.readline()
1925 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001926 break
1927 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001928 self.assertEquals(rlines, wlines)
1929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 def test_telling(self):
1931 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001932 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001933 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001934 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001935 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001936 p2 = f.tell()
1937 f.seek(0)
1938 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001939 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001940 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001941 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001942 self.assertEquals(f.tell(), p2)
1943 f.seek(0)
1944 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001945 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001946 self.assertRaises(IOError, f.tell)
1947 self.assertEquals(f.tell(), p2)
1948 f.close()
1949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950 def test_seeking(self):
1951 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001952 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001953 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001954 prefix = bytes(u_prefix.encode("utf-8"))
1955 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001956 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001957 suffix = bytes(u_suffix.encode("utf-8"))
1958 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001960 f.write(line*2)
1961 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001963 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001964 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001965 self.assertEquals(f.tell(), prefix_size)
1966 self.assertEquals(f.readline(), u_suffix)
1967
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001968 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001969 # Regression test for a specific bug
1970 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001971 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001972 f.write(data)
1973 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001975 f._CHUNK_SIZE # Just test that it exists
1976 f._CHUNK_SIZE = 2
1977 f.readline()
1978 f.tell()
1979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 def test_seek_and_tell(self):
1981 #Test seek/tell using the StatefulIncrementalDecoder.
1982 # Make test faster by doing smaller seeks
1983 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001984
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001985 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001986 """Tell/seek to various points within a data stream and ensure
1987 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001989 f.write(data)
1990 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001991 f = self.open(support.TESTFN, encoding='test_decoder')
1992 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001993 decoded = f.read()
1994 f.close()
1995
Neal Norwitze2b07052008-03-18 19:52:05 +00001996 for i in range(min_pos, len(decoded) + 1): # seek positions
1997 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001999 self.assertEquals(f.read(i), decoded[:i])
2000 cookie = f.tell()
2001 self.assertEquals(f.read(j), decoded[i:i + j])
2002 f.seek(cookie)
2003 self.assertEquals(f.read(), decoded[i:])
2004 f.close()
2005
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002006 # Enable the test decoder.
2007 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002008
2009 # Run the tests.
2010 try:
2011 # Try each test case.
2012 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002013 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014
2015 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002016 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2017 offset = CHUNK_SIZE - len(input)//2
2018 prefix = b'.'*offset
2019 # Don't bother seeking into the prefix (takes too long).
2020 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002021 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002022
2023 # Ensure our test decoder won't interfere with subsequent tests.
2024 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002025 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002028 data = "1234567890"
2029 tests = ("utf-16",
2030 "utf-16-le",
2031 "utf-16-be",
2032 "utf-32",
2033 "utf-32-le",
2034 "utf-32-be")
2035 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 buf = self.BytesIO()
2037 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002038 # Check if the BOM is written only once (see issue1753).
2039 f.write(data)
2040 f.write(data)
2041 f.seek(0)
2042 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002043 f.seek(0)
2044 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002045 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2046
Benjamin Petersona1b49012009-03-31 23:11:32 +00002047 def test_unreadable(self):
2048 class UnReadable(self.BytesIO):
2049 def readable(self):
2050 return False
2051 txt = self.TextIOWrapper(UnReadable())
2052 self.assertRaises(IOError, txt.read)
2053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 def test_read_one_by_one(self):
2055 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002056 reads = ""
2057 while True:
2058 c = txt.read(1)
2059 if not c:
2060 break
2061 reads += c
2062 self.assertEquals(reads, "AA\nBB")
2063
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002064 def test_readlines(self):
2065 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2066 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2067 txt.seek(0)
2068 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2069 txt.seek(0)
2070 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2071
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002072 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002074 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002076 reads = ""
2077 while True:
2078 c = txt.read(128)
2079 if not c:
2080 break
2081 reads += c
2082 self.assertEquals(reads, "A"*127+"\nB")
2083
2084 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002086
2087 # read one char at a time
2088 reads = ""
2089 while True:
2090 c = txt.read(1)
2091 if not c:
2092 break
2093 reads += c
2094 self.assertEquals(reads, self.normalized)
2095
2096 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002098 txt._CHUNK_SIZE = 4
2099
2100 reads = ""
2101 while True:
2102 c = txt.read(4)
2103 if not c:
2104 break
2105 reads += c
2106 self.assertEquals(reads, self.normalized)
2107
2108 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002110 txt._CHUNK_SIZE = 4
2111
2112 reads = txt.read(4)
2113 reads += txt.read(4)
2114 reads += txt.readline()
2115 reads += txt.readline()
2116 reads += txt.readline()
2117 self.assertEquals(reads, self.normalized)
2118
2119 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002121 txt._CHUNK_SIZE = 4
2122
2123 reads = txt.read(4)
2124 reads += txt.read()
2125 self.assertEquals(reads, self.normalized)
2126
2127 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002129 txt._CHUNK_SIZE = 4
2130
2131 reads = txt.read(4)
2132 pos = txt.tell()
2133 txt.seek(0)
2134 txt.seek(pos)
2135 self.assertEquals(txt.read(4), "BBB\n")
2136
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002137 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 buffer = self.BytesIO(self.testdata)
2139 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002140
2141 self.assertEqual(buffer.seekable(), txt.seekable())
2142
Antoine Pitroue4501852009-05-14 18:55:55 +00002143 def test_append_bom(self):
2144 # The BOM is not written again when appending to a non-empty file
2145 filename = support.TESTFN
2146 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2147 with self.open(filename, 'w', encoding=charset) as f:
2148 f.write('aaa')
2149 pos = f.tell()
2150 with self.open(filename, 'rb') as f:
2151 self.assertEquals(f.read(), 'aaa'.encode(charset))
2152
2153 with self.open(filename, 'a', encoding=charset) as f:
2154 f.write('xxx')
2155 with self.open(filename, 'rb') as f:
2156 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2157
2158 def test_seek_bom(self):
2159 # Same test, but when seeking manually
2160 filename = support.TESTFN
2161 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2162 with self.open(filename, 'w', encoding=charset) as f:
2163 f.write('aaa')
2164 pos = f.tell()
2165 with self.open(filename, 'r+', encoding=charset) as f:
2166 f.seek(pos)
2167 f.write('zzz')
2168 f.seek(0)
2169 f.write('bbb')
2170 with self.open(filename, 'rb') as f:
2171 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2172
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002173 def test_errors_property(self):
2174 with self.open(support.TESTFN, "w") as f:
2175 self.assertEqual(f.errors, "strict")
2176 with self.open(support.TESTFN, "w", errors="replace") as f:
2177 self.assertEqual(f.errors, "replace")
2178
Victor Stinner45df8202010-04-28 22:31:17 +00002179 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002180 def test_threads_write(self):
2181 # Issue6750: concurrent writes could duplicate data
2182 event = threading.Event()
2183 with self.open(support.TESTFN, "w", buffering=1) as f:
2184 def run(n):
2185 text = "Thread%03d\n" % n
2186 event.wait()
2187 f.write(text)
2188 threads = [threading.Thread(target=lambda n=x: run(n))
2189 for x in range(20)]
2190 for t in threads:
2191 t.start()
2192 time.sleep(0.02)
2193 event.set()
2194 for t in threads:
2195 t.join()
2196 with self.open(support.TESTFN) as f:
2197 content = f.read()
2198 for n in range(20):
2199 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2200
Antoine Pitrou6be88762010-05-03 16:48:20 +00002201 def test_flush_error_on_close(self):
2202 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2203 def bad_flush():
2204 raise IOError()
2205 txt.flush = bad_flush
2206 self.assertRaises(IOError, txt.close) # exception not swallowed
2207
2208 def test_multi_close(self):
2209 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2210 txt.close()
2211 txt.close()
2212 txt.close()
2213 self.assertRaises(ValueError, txt.flush)
2214
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002215 def test_unseekable(self):
2216 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2217 self.assertRaises(self.UnsupportedOperation, txt.tell)
2218 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220class CTextIOWrapperTest(TextIOWrapperTest):
2221
2222 def test_initialization(self):
2223 r = self.BytesIO(b"\xc3\xa9\n\n")
2224 b = self.BufferedReader(r, 1000)
2225 t = self.TextIOWrapper(b)
2226 self.assertRaises(TypeError, t.__init__, b, newline=42)
2227 self.assertRaises(ValueError, t.read)
2228 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2229 self.assertRaises(ValueError, t.read)
2230
2231 def test_garbage_collection(self):
2232 # C TextIOWrapper objects are collected, and collecting them flushes
2233 # all data to disk.
2234 # The Python version has __del__, so it ends in gc.garbage instead.
2235 rawio = io.FileIO(support.TESTFN, "wb")
2236 b = self.BufferedWriter(rawio)
2237 t = self.TextIOWrapper(b, encoding="ascii")
2238 t.write("456def")
2239 t.x = t
2240 wr = weakref.ref(t)
2241 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002242 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002243 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002244 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002245 self.assertEqual(f.read(), b"456def")
2246
2247class PyTextIOWrapperTest(TextIOWrapperTest):
2248 pass
2249
2250
2251class IncrementalNewlineDecoderTest(unittest.TestCase):
2252
2253 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002254 # UTF-8 specific tests for a newline decoder
2255 def _check_decode(b, s, **kwargs):
2256 # We exercise getstate() / setstate() as well as decode()
2257 state = decoder.getstate()
2258 self.assertEquals(decoder.decode(b, **kwargs), s)
2259 decoder.setstate(state)
2260 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002261
Antoine Pitrou180a3362008-12-14 16:36:46 +00002262 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002263
Antoine Pitrou180a3362008-12-14 16:36:46 +00002264 _check_decode(b'\xe8', "")
2265 _check_decode(b'\xa2', "")
2266 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267
Antoine Pitrou180a3362008-12-14 16:36:46 +00002268 _check_decode(b'\xe8', "")
2269 _check_decode(b'\xa2', "")
2270 _check_decode(b'\x88', "\u8888")
2271
2272 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002273 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2274
Antoine Pitrou180a3362008-12-14 16:36:46 +00002275 decoder.reset()
2276 _check_decode(b'\n', "\n")
2277 _check_decode(b'\r', "")
2278 _check_decode(b'', "\n", final=True)
2279 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002280
Antoine Pitrou180a3362008-12-14 16:36:46 +00002281 _check_decode(b'\r', "")
2282 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002283
Antoine Pitrou180a3362008-12-14 16:36:46 +00002284 _check_decode(b'\r\r\n', "\n\n")
2285 _check_decode(b'\r', "")
2286 _check_decode(b'\r', "\n")
2287 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002288
Antoine Pitrou180a3362008-12-14 16:36:46 +00002289 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2290 _check_decode(b'\xe8\xa2\x88', "\u8888")
2291 _check_decode(b'\n', "\n")
2292 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2293 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002296 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 if encoding is not None:
2298 encoder = codecs.getincrementalencoder(encoding)()
2299 def _decode_bytewise(s):
2300 # Decode one byte at a time
2301 for b in encoder.encode(s):
2302 result.append(decoder.decode(bytes([b])))
2303 else:
2304 encoder = None
2305 def _decode_bytewise(s):
2306 # Decode one char at a time
2307 for c in s:
2308 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002309 self.assertEquals(decoder.newlines, None)
2310 _decode_bytewise("abc\n\r")
2311 self.assertEquals(decoder.newlines, '\n')
2312 _decode_bytewise("\nabc")
2313 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2314 _decode_bytewise("abc\r")
2315 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2316 _decode_bytewise("abc")
2317 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2318 _decode_bytewise("abc\r")
2319 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2320 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 input = "abc"
2322 if encoder is not None:
2323 encoder.reset()
2324 input = encoder.encode(input)
2325 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 self.assertEquals(decoder.newlines, None)
2327
2328 def test_newline_decoder(self):
2329 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 # None meaning the IncrementalNewlineDecoder takes unicode input
2331 # rather than bytes input
2332 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002333 'utf-16', 'utf-16-le', 'utf-16-be',
2334 'utf-32', 'utf-32-le', 'utf-32-be',
2335 )
2336 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 decoder = enc and codecs.getincrementaldecoder(enc)()
2338 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2339 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002340 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002341 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2342 self.check_newline_decoding_utf8(decoder)
2343
Antoine Pitrou66913e22009-03-06 23:40:56 +00002344 def test_newline_bytes(self):
2345 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2346 def _check(dec):
2347 self.assertEquals(dec.newlines, None)
2348 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2349 self.assertEquals(dec.newlines, None)
2350 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2351 self.assertEquals(dec.newlines, None)
2352 dec = self.IncrementalNewlineDecoder(None, translate=False)
2353 _check(dec)
2354 dec = self.IncrementalNewlineDecoder(None, translate=True)
2355 _check(dec)
2356
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002357class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2358 pass
2359
2360class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2361 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002362
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002363
Guido van Rossum01a27522007-03-07 01:00:12 +00002364# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002365
Guido van Rossum5abbf752007-08-27 17:39:33 +00002366class MiscIOTest(unittest.TestCase):
2367
Barry Warsaw40e82462008-11-20 20:14:50 +00002368 def tearDown(self):
2369 support.unlink(support.TESTFN)
2370
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371 def test___all__(self):
2372 for name in self.io.__all__:
2373 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002374 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002375 if name == "open":
2376 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002377 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002378 self.assertTrue(issubclass(obj, Exception), name)
2379 elif not name.startswith("SEEK_"):
2380 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002381
Barry Warsaw40e82462008-11-20 20:14:50 +00002382 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002384 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002385 f.close()
2386
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002387 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002388 self.assertEquals(f.name, support.TESTFN)
2389 self.assertEquals(f.buffer.name, support.TESTFN)
2390 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2391 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002392 self.assertEquals(f.buffer.mode, "rb")
2393 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002394 f.close()
2395
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002397 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002398 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2399 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002402 self.assertEquals(g.mode, "wb")
2403 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002404 self.assertEquals(g.name, f.fileno())
2405 self.assertEquals(g.raw.name, f.fileno())
2406 f.close()
2407 g.close()
2408
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002409 def test_io_after_close(self):
2410 for kwargs in [
2411 {"mode": "w"},
2412 {"mode": "wb"},
2413 {"mode": "w", "buffering": 1},
2414 {"mode": "w", "buffering": 2},
2415 {"mode": "wb", "buffering": 0},
2416 {"mode": "r"},
2417 {"mode": "rb"},
2418 {"mode": "r", "buffering": 1},
2419 {"mode": "r", "buffering": 2},
2420 {"mode": "rb", "buffering": 0},
2421 {"mode": "w+"},
2422 {"mode": "w+b"},
2423 {"mode": "w+", "buffering": 1},
2424 {"mode": "w+", "buffering": 2},
2425 {"mode": "w+b", "buffering": 0},
2426 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002428 f.close()
2429 self.assertRaises(ValueError, f.flush)
2430 self.assertRaises(ValueError, f.fileno)
2431 self.assertRaises(ValueError, f.isatty)
2432 self.assertRaises(ValueError, f.__iter__)
2433 if hasattr(f, "peek"):
2434 self.assertRaises(ValueError, f.peek, 1)
2435 self.assertRaises(ValueError, f.read)
2436 if hasattr(f, "read1"):
2437 self.assertRaises(ValueError, f.read1, 1024)
2438 if hasattr(f, "readinto"):
2439 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2440 self.assertRaises(ValueError, f.readline)
2441 self.assertRaises(ValueError, f.readlines)
2442 self.assertRaises(ValueError, f.seek, 0)
2443 self.assertRaises(ValueError, f.tell)
2444 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002445 self.assertRaises(ValueError, f.write,
2446 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002447 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002449
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 def test_blockingioerror(self):
2451 # Various BlockingIOError issues
2452 self.assertRaises(TypeError, self.BlockingIOError)
2453 self.assertRaises(TypeError, self.BlockingIOError, 1)
2454 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2455 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2456 b = self.BlockingIOError(1, "")
2457 self.assertEqual(b.characters_written, 0)
2458 class C(str):
2459 pass
2460 c = C("")
2461 b = self.BlockingIOError(1, c)
2462 c.b = b
2463 b.c = c
2464 wr = weakref.ref(c)
2465 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002466 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002467 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468
2469 def test_abcs(self):
2470 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002471 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2472 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2473 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2474 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475
2476 def _check_abc_inheritance(self, abcmodule):
2477 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002478 self.assertIsInstance(f, abcmodule.IOBase)
2479 self.assertIsInstance(f, abcmodule.RawIOBase)
2480 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2481 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002483 self.assertIsInstance(f, abcmodule.IOBase)
2484 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2485 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2486 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002487 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002488 self.assertIsInstance(f, abcmodule.IOBase)
2489 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2490 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2491 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492
2493 def test_abc_inheritance(self):
2494 # Test implementations inherit from their respective ABCs
2495 self._check_abc_inheritance(self)
2496
2497 def test_abc_inheritance_official(self):
2498 # Test implementations inherit from the official ABCs of the
2499 # baseline "io" module.
2500 self._check_abc_inheritance(io)
2501
2502class CMiscIOTest(MiscIOTest):
2503 io = io
2504
2505class PyMiscIOTest(MiscIOTest):
2506 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002507
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002508
2509@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2510class SignalsTest(unittest.TestCase):
2511
2512 def setUp(self):
2513 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2514
2515 def tearDown(self):
2516 signal.signal(signal.SIGALRM, self.oldalrm)
2517
2518 def alarm_interrupt(self, sig, frame):
2519 1/0
2520
2521 @unittest.skipUnless(threading, 'Threading required for this test.')
2522 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2523 """Check that a partial write, when it gets interrupted, properly
2524 invokes the signal handler."""
2525 read_results = []
2526 def _read():
2527 s = os.read(r, 1)
2528 read_results.append(s)
2529 t = threading.Thread(target=_read)
2530 t.daemon = True
2531 r, w = os.pipe()
2532 try:
2533 wio = self.io.open(w, **fdopen_kwargs)
2534 t.start()
2535 signal.alarm(1)
2536 # Fill the pipe enough that the write will be blocking.
2537 # It will be interrupted by the timer armed above. Since the
2538 # other thread has read one byte, the low-level write will
2539 # return with a successful (partial) result rather than an EINTR.
2540 # The buffered IO layer must check for pending signal
2541 # handlers, which in this case will invoke alarm_interrupt().
2542 self.assertRaises(ZeroDivisionError,
2543 wio.write, item * (1024 * 1024))
2544 t.join()
2545 # We got one byte, get another one and check that it isn't a
2546 # repeat of the first one.
2547 read_results.append(os.read(r, 1))
2548 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2549 finally:
2550 os.close(w)
2551 os.close(r)
2552 # This is deliberate. If we didn't close the file descriptor
2553 # before closing wio, wio would try to flush its internal
2554 # buffer, and block again.
2555 try:
2556 wio.close()
2557 except IOError as e:
2558 if e.errno != errno.EBADF:
2559 raise
2560
2561 def test_interrupted_write_unbuffered(self):
2562 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2563
2564 def test_interrupted_write_buffered(self):
2565 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2566
2567 def test_interrupted_write_text(self):
2568 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2569
2570class CSignalsTest(SignalsTest):
2571 io = io
2572
2573class PySignalsTest(SignalsTest):
2574 io = pyio
2575
2576
Guido van Rossum28524c72007-02-27 05:47:44 +00002577def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 tests = (CIOTest, PyIOTest,
2579 CBufferedReaderTest, PyBufferedReaderTest,
2580 CBufferedWriterTest, PyBufferedWriterTest,
2581 CBufferedRWPairTest, PyBufferedRWPairTest,
2582 CBufferedRandomTest, PyBufferedRandomTest,
2583 StatefulIncrementalDecoderTest,
2584 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2585 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002586 CMiscIOTest, PyMiscIOTest,
2587 CSignalsTest, PySignalsTest,
2588 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589
2590 # Put the namespaces of the IO module we are testing and some useful mock
2591 # classes in the __dict__ of each test.
2592 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002593 MockNonBlockWriterIO, MockUnseekableIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2595 c_io_ns = {name : getattr(io, name) for name in all_members}
2596 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2597 globs = globals()
2598 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2599 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2600 # Avoid turning open into a bound method.
2601 py_io_ns["open"] = pyio.OpenWrapper
2602 for test in tests:
2603 if test.__name__.startswith("C"):
2604 for name, obj in c_io_ns.items():
2605 setattr(test, name, obj)
2606 elif test.__name__.startswith("Py"):
2607 for name, obj in py_io_ns.items():
2608 setattr(test, name, obj)
2609
2610 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002611
2612if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002613 test_main()