blob: 7ce9753a389149b93c796596bb1c6eebf6c8736c [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Georg Brandl1b37e872010-03-14 10:45:50 +000033from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000034from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000040try:
41 import threading
42except ImportError:
43 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000044
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000046def _default_chunk_size():
47 """Get the default TextIOWrapper chunk size"""
48 with open(__file__, "r", encoding="latin1") as f:
49 return f._CHUNK_SIZE
50
51
Antoine Pitrou328ec742010-09-14 18:37:24 +000052class MockRawIOWithoutRead:
53 """A RawIO implementation without read(), so as to exercise the default
54 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000055
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000056 def __init__(self, read_stack=()):
57 self._read_stack = list(read_stack)
58 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000059 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000060 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000090 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000091 return 0
92 if data is None:
93 del self._read_stack[0]
94 return None
95 n = len(data)
96 if len(data) <= max_len:
97 del self._read_stack[0]
98 buf[:n] = data
99 return n
100 else:
101 buf[:] = data[:max_len]
102 self._read_stack[0] = data[max_len:]
103 return max_len
104
105 def truncate(self, pos=None):
106 return pos
107
Antoine Pitrou328ec742010-09-14 18:37:24 +0000108class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
109 pass
110
111class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
112 pass
113
114
115class MockRawIO(MockRawIOWithoutRead):
116
117 def read(self, n=None):
118 self._reads += 1
119 try:
120 return self._read_stack.pop(0)
121 except:
122 self._extraneous_reads += 1
123 return b""
124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000125class CMockRawIO(MockRawIO, io.RawIOBase):
126 pass
127
128class PyMockRawIO(MockRawIO, pyio.RawIOBase):
129 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000130
Guido van Rossuma9e20242007-03-08 00:43:48 +0000131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000132class MisbehavedRawIO(MockRawIO):
133 def write(self, b):
134 return super().write(b) * 2
135
136 def read(self, n=None):
137 return super().read(n) * 2
138
139 def seek(self, pos, whence):
140 return -123
141
142 def tell(self):
143 return -456
144
145 def readinto(self, buf):
146 super().readinto(buf)
147 return len(buf) * 5
148
149class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
150 pass
151
152class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
153 pass
154
155
156class CloseFailureIO(MockRawIO):
157 closed = 0
158
159 def close(self):
160 if not self.closed:
161 self.closed = 1
162 raise IOError
163
164class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
165 pass
166
167class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
168 pass
169
170
171class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000172
173 def __init__(self, data):
174 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000175 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000179 self.read_history.append(None if res is None else len(res))
180 return res
181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def readinto(self, b):
183 res = super().readinto(b)
184 self.read_history.append(res)
185 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187class CMockFileIO(MockFileIO, io.BytesIO):
188 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000189
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000190class PyMockFileIO(MockFileIO, pyio.BytesIO):
191 pass
192
193
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000194class MockUnseekableIO:
195 def seekable(self):
196 return False
197
198 def seek(self, *args):
199 raise self.UnsupportedOperation("not seekable")
200
201 def tell(self, *args):
202 raise self.UnsupportedOperation("not seekable")
203
204class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
205 UnsupportedOperation = io.UnsupportedOperation
206
207class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
208 UnsupportedOperation = pyio.UnsupportedOperation
209
210
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000211class MockNonBlockWriterIO:
212
213 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000214 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 def pop_written(self):
218 s = b"".join(self._write_stack)
219 self._write_stack[:] = []
220 return s
221
222 def block_on(self, char):
223 """Block when a given char is encountered."""
224 self._blocker_char = char
225
226 def readable(self):
227 return True
228
229 def seekable(self):
230 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000231
Guido van Rossum01a27522007-03-07 01:00:12 +0000232 def writable(self):
233 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235 def write(self, b):
236 b = bytes(b)
237 n = -1
238 if self._blocker_char:
239 try:
240 n = b.index(self._blocker_char)
241 except ValueError:
242 pass
243 else:
244 self._blocker_char = None
245 self._write_stack.append(b[:n])
246 raise self.BlockingIOError(0, "test blocking", n)
247 self._write_stack.append(b)
248 return len(b)
249
250class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
251 BlockingIOError = io.BlockingIOError
252
253class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
254 BlockingIOError = pyio.BlockingIOError
255
Guido van Rossuma9e20242007-03-08 00:43:48 +0000256
Guido van Rossum28524c72007-02-27 05:47:44 +0000257class IOTest(unittest.TestCase):
258
Neal Norwitze7789b12008-03-24 06:18:09 +0000259 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000260 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000261
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000262 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000263 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000264
Guido van Rossum28524c72007-02-27 05:47:44 +0000265 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000266 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000267 f.truncate(0)
268 self.assertEqual(f.tell(), 5)
269 f.seek(0)
270
271 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000272 self.assertEqual(f.seek(0), 0)
273 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000277 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000278 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000279 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000280 self.assertEqual(f.seek(-1, 2), 13)
281 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000282
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000284 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000285 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286
Guido van Rossum9b76da62007-04-11 01:09:03 +0000287 def read_ops(self, f, buffered=False):
288 data = f.read(5)
289 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000290 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000291 self.assertEqual(f.readinto(data), 5)
292 self.assertEqual(data, b" worl")
293 self.assertEqual(f.readinto(data), 2)
294 self.assertEqual(len(data), 5)
295 self.assertEqual(data[:2], b"d\n")
296 self.assertEqual(f.seek(0), 0)
297 self.assertEqual(f.read(20), b"hello world\n")
298 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.seek(-6, 2), 6)
301 self.assertEqual(f.read(5), b"world")
302 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000303 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000304 self.assertEqual(f.seek(-6, 1), 5)
305 self.assertEqual(f.read(5), b" worl")
306 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000307 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000308 if buffered:
309 f.seek(0)
310 self.assertEqual(f.read(), b"hello world\n")
311 f.seek(6)
312 self.assertEqual(f.read(), b"world\n")
313 self.assertEqual(f.read(), b"")
314
Guido van Rossum34d69e52007-04-10 20:08:41 +0000315 LARGE = 2**31
316
Guido van Rossum53807da2007-04-10 19:01:47 +0000317 def large_file_ops(self, f):
318 assert f.readable()
319 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000320 self.assertEqual(f.seek(self.LARGE), self.LARGE)
321 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000322 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000323 self.assertEqual(f.tell(), self.LARGE + 3)
324 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 self.assertEqual(f.tell(), self.LARGE + 2)
327 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000328 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000329 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
331 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.read(2), b"x")
333
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000334 def test_invalid_operations(self):
335 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000336 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000337 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000338 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000339 self.assertRaises(exc, fp.read)
340 self.assertRaises(exc, fp.readline)
341 with self.open(support.TESTFN, "wb", buffering=0) as fp:
342 self.assertRaises(exc, fp.read)
343 self.assertRaises(exc, fp.readline)
344 with self.open(support.TESTFN, "rb", buffering=0) as fp:
345 self.assertRaises(exc, fp.write, b"blah")
346 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.write, b"blah")
349 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000350 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000351 self.assertRaises(exc, fp.write, "blah")
352 self.assertRaises(exc, fp.writelines, ["blah\n"])
353 # Non-zero seeking from current or end pos
354 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
355 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000356
Guido van Rossum28524c72007-02-27 05:47:44 +0000357 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000358 with self.open(support.TESTFN, "wb", buffering=0) as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb", buffering=0) as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000368
Guido van Rossum87429772007-04-10 21:06:59 +0000369 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb") as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb") as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000380
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000381 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
384 with self.open(support.TESTFN, "rb") as f:
385 self.assertEqual(f.readline(), b"abc\n")
386 self.assertEqual(f.readline(10), b"def\n")
387 self.assertEqual(f.readline(2), b"xy")
388 self.assertEqual(f.readline(4), b"zzy\n")
389 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000390 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000391 self.assertRaises(TypeError, f.readline, 5.3)
392 with self.open(support.TESTFN, "r") as f:
393 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000394
Guido van Rossum28524c72007-02-27 05:47:44 +0000395 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000397 self.write_ops(f)
398 data = f.getvalue()
399 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000401 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000402
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000404 # On Windows and Mac OSX this test comsumes large resources; It takes
405 # a long time to build the >2GB file and takes >2GB of disk space
406 # therefore the resource must be enabled to run this test.
407 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000408 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000409 print("\nTesting large file ops skipped on %s." % sys.platform,
410 file=sys.stderr)
411 print("It requires %d bytes and a long time." % self.LARGE,
412 file=sys.stderr)
413 print("Use 'regrtest.py -u largefile test_io' to run it.",
414 file=sys.stderr)
415 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 with self.open(support.TESTFN, "w+b", 0) as f:
417 self.large_file_ops(f)
418 with self.open(support.TESTFN, "w+b") as f:
419 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000420
421 def test_with_open(self):
422 for bufsize in (0, 1, 100):
423 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000424 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000425 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000426 self.assertEqual(f.closed, True)
427 f = None
428 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000429 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000430 1/0
431 except ZeroDivisionError:
432 self.assertEqual(f.closed, True)
433 else:
434 self.fail("1/0 didn't raise an exception")
435
Antoine Pitrou08838b62009-01-21 00:55:13 +0000436 # issue 5008
437 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000439 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000440 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000441 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000444 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000445 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446
Guido van Rossum87429772007-04-10 21:06:59 +0000447 def test_destructor(self):
448 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def __del__(self):
451 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 try:
453 f = super().__del__
454 except AttributeError:
455 pass
456 else:
457 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def close(self):
459 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def flush(self):
462 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 super().flush()
464 f = MyFileIO(support.TESTFN, "wb")
465 f.write(b"xxx")
466 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000467 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000469 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000470 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471
472 def _check_base_destructor(self, base):
473 record = []
474 class MyIO(base):
475 def __init__(self):
476 # This exercises the availability of attributes on object
477 # destruction.
478 # (in the C version, close() is called by the tp_dealloc
479 # function, not by __del__)
480 self.on_del = 1
481 self.on_close = 2
482 self.on_flush = 3
483 def __del__(self):
484 record.append(self.on_del)
485 try:
486 f = super().__del__
487 except AttributeError:
488 pass
489 else:
490 f()
491 def close(self):
492 record.append(self.on_close)
493 super().close()
494 def flush(self):
495 record.append(self.on_flush)
496 super().flush()
497 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000498 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000499 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 self.assertEqual(record, [1, 2, 3])
501
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000502 def test_IOBase_destructor(self):
503 self._check_base_destructor(self.IOBase)
504
505 def test_RawIOBase_destructor(self):
506 self._check_base_destructor(self.RawIOBase)
507
508 def test_BufferedIOBase_destructor(self):
509 self._check_base_destructor(self.BufferedIOBase)
510
511 def test_TextIOBase_destructor(self):
512 self._check_base_destructor(self.TextIOBase)
513
Guido van Rossum87429772007-04-10 21:06:59 +0000514 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000515 with self.open(support.TESTFN, "wb") as f:
516 f.write(b"xxx")
517 with self.open(support.TESTFN, "rb") as f:
518 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000519
Guido van Rossumd4103952007-04-12 05:44:49 +0000520 def test_array_writes(self):
521 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000522 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb", 0) as f:
524 self.assertEqual(f.write(a), n)
525 with self.open(support.TESTFN, "wb") as f:
526 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000527
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000528 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000529 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532 def test_read_closed(self):
533 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000534 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535 with self.open(support.TESTFN, "r") as f:
536 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000537 self.assertEqual(file.read(), "egg\n")
538 file.seek(0)
539 file.close()
540 self.assertRaises(ValueError, file.read)
541
542 def test_no_closefd_with_filename(self):
543 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545
546 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000548 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(file.buffer.raw.closefd, False)
553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000554 def test_garbage_collection(self):
555 # FileIO objects are collected, and collecting them flushes
556 # all data to disk.
557 f = self.FileIO(support.TESTFN, "wb")
558 f.write(b"abcxxx")
559 f.f = f
560 wr = weakref.ref(f)
561 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000562 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000563 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000564 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000566
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000567 def test_unbounded_file(self):
568 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
569 zero = "/dev/zero"
570 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000571 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000572 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000573 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000574 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000575 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000576 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000578 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000579 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000580 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 self.assertRaises(OverflowError, f.read)
582
Antoine Pitrou6be88762010-05-03 16:48:20 +0000583 def test_flush_error_on_close(self):
584 f = self.open(support.TESTFN, "wb", buffering=0)
585 def bad_flush():
586 raise IOError()
587 f.flush = bad_flush
588 self.assertRaises(IOError, f.close) # exception not swallowed
589
590 def test_multi_close(self):
591 f = self.open(support.TESTFN, "wb", buffering=0)
592 f.close()
593 f.close()
594 f.close()
595 self.assertRaises(ValueError, f.flush)
596
Antoine Pitrou328ec742010-09-14 18:37:24 +0000597 def test_RawIOBase_read(self):
598 # Exercise the default RawIOBase.read() implementation (which calls
599 # readinto() internally).
600 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
601 self.assertEqual(rawio.read(2), b"ab")
602 self.assertEqual(rawio.read(2), b"c")
603 self.assertEqual(rawio.read(2), b"d")
604 self.assertEqual(rawio.read(2), None)
605 self.assertEqual(rawio.read(2), b"ef")
606 self.assertEqual(rawio.read(2), b"g")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"")
609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610class CIOTest(IOTest):
611 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class PyIOTest(IOTest):
614 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000615
Guido van Rossuma9e20242007-03-08 00:43:48 +0000616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617class CommonBufferedTests:
618 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
619
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000620 def test_detach(self):
621 raw = self.MockRawIO()
622 buf = self.tp(raw)
623 self.assertIs(buf.detach(), raw)
624 self.assertRaises(ValueError, buf.detach)
625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 def test_fileno(self):
627 rawio = self.MockRawIO()
628 bufio = self.tp(rawio)
629
630 self.assertEquals(42, bufio.fileno())
631
632 def test_no_fileno(self):
633 # XXX will we always have fileno() function? If so, kill
634 # this test. Else, write it.
635 pass
636
637 def test_invalid_args(self):
638 rawio = self.MockRawIO()
639 bufio = self.tp(rawio)
640 # Invalid whence
641 self.assertRaises(ValueError, bufio.seek, 0, -1)
642 self.assertRaises(ValueError, bufio.seek, 0, 3)
643
644 def test_override_destructor(self):
645 tp = self.tp
646 record = []
647 class MyBufferedIO(tp):
648 def __del__(self):
649 record.append(1)
650 try:
651 f = super().__del__
652 except AttributeError:
653 pass
654 else:
655 f()
656 def close(self):
657 record.append(2)
658 super().close()
659 def flush(self):
660 record.append(3)
661 super().flush()
662 rawio = self.MockRawIO()
663 bufio = MyBufferedIO(rawio)
664 writable = bufio.writable()
665 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000666 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000667 if writable:
668 self.assertEqual(record, [1, 2, 3])
669 else:
670 self.assertEqual(record, [1, 2])
671
672 def test_context_manager(self):
673 # Test usability as a context manager
674 rawio = self.MockRawIO()
675 bufio = self.tp(rawio)
676 def _with():
677 with bufio:
678 pass
679 _with()
680 # bufio should now be closed, and using it a second time should raise
681 # a ValueError.
682 self.assertRaises(ValueError, _with)
683
684 def test_error_through_destructor(self):
685 # Test that the exception state is not modified by a destructor,
686 # even if close() fails.
687 rawio = self.CloseFailureIO()
688 def f():
689 self.tp(rawio).xyzzy
690 with support.captured_output("stderr") as s:
691 self.assertRaises(AttributeError, f)
692 s = s.getvalue().strip()
693 if s:
694 # The destructor *may* have printed an unraisable error, check it
695 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000696 self.assertTrue(s.startswith("Exception IOError: "), s)
697 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000698
Antoine Pitrou716c4442009-05-23 19:04:03 +0000699 def test_repr(self):
700 raw = self.MockRawIO()
701 b = self.tp(raw)
702 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
703 self.assertEqual(repr(b), "<%s>" % clsname)
704 raw.name = "dummy"
705 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
706 raw.name = b"dummy"
707 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
708
Antoine Pitrou6be88762010-05-03 16:48:20 +0000709 def test_flush_error_on_close(self):
710 raw = self.MockRawIO()
711 def bad_flush():
712 raise IOError()
713 raw.flush = bad_flush
714 b = self.tp(raw)
715 self.assertRaises(IOError, b.close) # exception not swallowed
716
717 def test_multi_close(self):
718 raw = self.MockRawIO()
719 b = self.tp(raw)
720 b.close()
721 b.close()
722 b.close()
723 self.assertRaises(ValueError, b.flush)
724
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000725 def test_unseekable(self):
726 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
727 self.assertRaises(self.UnsupportedOperation, bufio.tell)
728 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
729
Guido van Rossum78892e42007-04-06 17:31:18 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
732 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_constructor(self):
735 rawio = self.MockRawIO([b"abc"])
736 bufio = self.tp(rawio)
737 bufio.__init__(rawio)
738 bufio.__init__(rawio, buffer_size=1024)
739 bufio.__init__(rawio, buffer_size=16)
740 self.assertEquals(b"abc", bufio.read())
741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
742 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
743 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
744 rawio = self.MockRawIO([b"abc"])
745 bufio.__init__(rawio)
746 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000749 for arg in (None, 7):
750 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
751 bufio = self.tp(rawio)
752 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 # Invalid args
754 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 def test_read1(self):
757 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
758 bufio = self.tp(rawio)
759 self.assertEquals(b"a", bufio.read(1))
760 self.assertEquals(b"b", bufio.read1(1))
761 self.assertEquals(rawio._reads, 1)
762 self.assertEquals(b"c", bufio.read1(100))
763 self.assertEquals(rawio._reads, 1)
764 self.assertEquals(b"d", bufio.read1(100))
765 self.assertEquals(rawio._reads, 2)
766 self.assertEquals(b"efg", bufio.read1(100))
767 self.assertEquals(rawio._reads, 3)
768 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000769 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000770 # Invalid args
771 self.assertRaises(ValueError, bufio.read1, -1)
772
773 def test_readinto(self):
774 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
775 bufio = self.tp(rawio)
776 b = bytearray(2)
777 self.assertEquals(bufio.readinto(b), 2)
778 self.assertEquals(b, b"ab")
779 self.assertEquals(bufio.readinto(b), 2)
780 self.assertEquals(b, b"cd")
781 self.assertEquals(bufio.readinto(b), 2)
782 self.assertEquals(b, b"ef")
783 self.assertEquals(bufio.readinto(b), 1)
784 self.assertEquals(b, b"gf")
785 self.assertEquals(bufio.readinto(b), 0)
786 self.assertEquals(b, b"gf")
787
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000788 def test_readlines(self):
789 def bufio():
790 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
791 return self.tp(rawio)
792 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
793 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
794 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
795
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000796 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000797 data = b"abcdefghi"
798 dlen = len(data)
799
800 tests = [
801 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
802 [ 100, [ 3, 3, 3], [ dlen ] ],
803 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
804 ]
805
806 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807 rawio = self.MockFileIO(data)
808 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000809 pos = 0
810 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000811 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000812 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000813 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000814 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000817 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000818 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
819 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000820
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000821 self.assertEquals(b"abcd", bufio.read(6))
822 self.assertEquals(b"e", bufio.read(1))
823 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000824 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000825 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000826 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 def test_read_past_eof(self):
829 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
830 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000831
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000832 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 def test_read_all(self):
835 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
836 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000837
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000838 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000839
Victor Stinner45df8202010-04-28 22:31:17 +0000840 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000841 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000843 try:
844 # Write out many bytes with exactly the same number of 0's,
845 # 1's... 255's. This will help us check that concurrent reading
846 # doesn't duplicate or forget contents.
847 N = 1000
848 l = list(range(256)) * N
849 random.shuffle(l)
850 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000851 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000852 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000853 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000855 errors = []
856 results = []
857 def f():
858 try:
859 # Intra-buffer read then buffer-flushing read
860 for n in cycle([1, 19]):
861 s = bufio.read(n)
862 if not s:
863 break
864 # list.append() is atomic
865 results.append(s)
866 except Exception as e:
867 errors.append(e)
868 raise
869 threads = [threading.Thread(target=f) for x in range(20)]
870 for t in threads:
871 t.start()
872 time.sleep(0.02) # yield
873 for t in threads:
874 t.join()
875 self.assertFalse(errors,
876 "the following exceptions were caught: %r" % errors)
877 s = b''.join(results)
878 for i in range(256):
879 c = bytes(bytearray([i]))
880 self.assertEqual(s.count(c), N)
881 finally:
882 support.unlink(support.TESTFN)
883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 def test_misbehaved_io(self):
885 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
886 bufio = self.tp(rawio)
887 self.assertRaises(IOError, bufio.seek, 0)
888 self.assertRaises(IOError, bufio.tell)
889
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000890 def test_no_extraneous_read(self):
891 # Issue #9550; when the raw IO object has satisfied the read request,
892 # we should not issue any additional reads, otherwise it may block
893 # (e.g. socket).
894 bufsize = 16
895 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
896 rawio = self.MockRawIO([b"x" * n])
897 bufio = self.tp(rawio, bufsize)
898 self.assertEqual(bufio.read(n), b"x" * n)
899 # Simple case: one raw read is enough to satisfy the request.
900 self.assertEqual(rawio._extraneous_reads, 0,
901 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
902 # A more complex case where two raw reads are needed to satisfy
903 # the request.
904 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
905 bufio = self.tp(rawio, bufsize)
906 self.assertEqual(bufio.read(n), b"x" * n)
907 self.assertEqual(rawio._extraneous_reads, 0,
908 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
909
910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911class CBufferedReaderTest(BufferedReaderTest):
912 tp = io.BufferedReader
913
914 def test_constructor(self):
915 BufferedReaderTest.test_constructor(self)
916 # The allocation can succeed on 32-bit builds, e.g. with more
917 # than 2GB RAM and a 64-bit kernel.
918 if sys.maxsize > 0x7FFFFFFF:
919 rawio = self.MockRawIO()
920 bufio = self.tp(rawio)
921 self.assertRaises((OverflowError, MemoryError, ValueError),
922 bufio.__init__, rawio, sys.maxsize)
923
924 def test_initialization(self):
925 rawio = self.MockRawIO([b"abc"])
926 bufio = self.tp(rawio)
927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
928 self.assertRaises(ValueError, bufio.read)
929 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
930 self.assertRaises(ValueError, bufio.read)
931 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
932 self.assertRaises(ValueError, bufio.read)
933
934 def test_misbehaved_io_read(self):
935 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
936 bufio = self.tp(rawio)
937 # _pyio.BufferedReader seems to implement reading different, so that
938 # checking this is not so easy.
939 self.assertRaises(IOError, bufio.read, 10)
940
941 def test_garbage_collection(self):
942 # C BufferedReader objects are collected.
943 # The Python version has __del__, so it ends into gc.garbage instead
944 rawio = self.FileIO(support.TESTFN, "w+b")
945 f = self.tp(rawio)
946 f.f = f
947 wr = weakref.ref(f)
948 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000949 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000950 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951
952class PyBufferedReaderTest(BufferedReaderTest):
953 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000954
Guido van Rossuma9e20242007-03-08 00:43:48 +0000955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
957 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_constructor(self):
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962 bufio.__init__(rawio)
963 bufio.__init__(rawio, buffer_size=1024)
964 bufio.__init__(rawio, buffer_size=16)
965 self.assertEquals(3, bufio.write(b"abc"))
966 bufio.flush()
967 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
969 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
970 bufio.__init__(rawio)
971 self.assertEquals(3, bufio.write(b"ghi"))
972 bufio.flush()
973 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
974
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000975 def test_detach_flush(self):
976 raw = self.MockRawIO()
977 buf = self.tp(raw)
978 buf.write(b"howdy!")
979 self.assertFalse(raw._write_stack)
980 buf.detach()
981 self.assertEqual(raw._write_stack, [b"howdy!"])
982
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000984 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 writer = self.MockRawIO()
986 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000987 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000988 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990 def test_write_overflow(self):
991 writer = self.MockRawIO()
992 bufio = self.tp(writer, 8)
993 contents = b"abcdefghijklmnop"
994 for n in range(0, len(contents), 3):
995 bufio.write(contents[n:n+3])
996 flushed = b"".join(writer._write_stack)
997 # At least (total - 8) bytes were implicitly flushed, perhaps more
998 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000999 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 def check_writes(self, intermediate_func):
1002 # Lots of writes, test the flushed output is as expected.
1003 contents = bytes(range(256)) * 1000
1004 n = 0
1005 writer = self.MockRawIO()
1006 bufio = self.tp(writer, 13)
1007 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1008 def gen_sizes():
1009 for size in count(1):
1010 for i in range(15):
1011 yield size
1012 sizes = gen_sizes()
1013 while n < len(contents):
1014 size = min(next(sizes), len(contents) - n)
1015 self.assertEquals(bufio.write(contents[n:n+size]), size)
1016 intermediate_func(bufio)
1017 n += size
1018 bufio.flush()
1019 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 def test_writes(self):
1022 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_writes_and_flushes(self):
1025 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027 def test_writes_and_seeks(self):
1028 def _seekabs(bufio):
1029 pos = bufio.tell()
1030 bufio.seek(pos + 1, 0)
1031 bufio.seek(pos - 1, 0)
1032 bufio.seek(pos, 0)
1033 self.check_writes(_seekabs)
1034 def _seekrel(bufio):
1035 pos = bufio.seek(0, 1)
1036 bufio.seek(+1, 1)
1037 bufio.seek(-1, 1)
1038 bufio.seek(pos, 0)
1039 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 def test_writes_and_truncates(self):
1042 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044 def test_write_non_blocking(self):
1045 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001046 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 self.assertEquals(bufio.write(b"abcd"), 4)
1049 self.assertEquals(bufio.write(b"efghi"), 5)
1050 # 1 byte will be written, the rest will be buffered
1051 raw.block_on(b"k")
1052 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1055 raw.block_on(b"0")
1056 try:
1057 bufio.write(b"opqrwxyz0123456789")
1058 except self.BlockingIOError as e:
1059 written = e.characters_written
1060 else:
1061 self.fail("BlockingIOError should have been raised")
1062 self.assertEquals(written, 16)
1063 self.assertEquals(raw.pop_written(),
1064 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1067 s = raw.pop_written()
1068 # Previously buffered bytes were flushed
1069 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001070
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 def test_write_and_rewind(self):
1072 raw = io.BytesIO()
1073 bufio = self.tp(raw, 4)
1074 self.assertEqual(bufio.write(b"abcdef"), 6)
1075 self.assertEqual(bufio.tell(), 6)
1076 bufio.seek(0, 0)
1077 self.assertEqual(bufio.write(b"XY"), 2)
1078 bufio.seek(6, 0)
1079 self.assertEqual(raw.getvalue(), b"XYcdef")
1080 self.assertEqual(bufio.write(b"123456"), 6)
1081 bufio.flush()
1082 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 def test_flush(self):
1085 writer = self.MockRawIO()
1086 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001087 bufio.write(b"abc")
1088 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001089 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 def test_destructor(self):
1092 writer = self.MockRawIO()
1093 bufio = self.tp(writer, 8)
1094 bufio.write(b"abc")
1095 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001096 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 self.assertEquals(b"abc", writer._write_stack[0])
1098
1099 def test_truncate(self):
1100 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001101 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 bufio = self.tp(raw, 8)
1103 bufio.write(b"abcdef")
1104 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001105 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001106 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 self.assertEqual(f.read(), b"abc")
1108
Victor Stinner45df8202010-04-28 22:31:17 +00001109 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001110 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001112 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 # Write out many bytes from many threads and test they were
1114 # all flushed.
1115 N = 1000
1116 contents = bytes(range(256)) * N
1117 sizes = cycle([1, 19])
1118 n = 0
1119 queue = deque()
1120 while n < len(contents):
1121 size = next(sizes)
1122 queue.append(contents[n:n+size])
1123 n += size
1124 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001125 # We use a real file object because it allows us to
1126 # exercise situations where the GIL is released before
1127 # writing the buffer to the raw streams. This is in addition
1128 # to concurrency issues due to switching threads in the middle
1129 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001130 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001132 errors = []
1133 def f():
1134 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 while True:
1136 try:
1137 s = queue.popleft()
1138 except IndexError:
1139 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001140 bufio.write(s)
1141 except Exception as e:
1142 errors.append(e)
1143 raise
1144 threads = [threading.Thread(target=f) for x in range(20)]
1145 for t in threads:
1146 t.start()
1147 time.sleep(0.02) # yield
1148 for t in threads:
1149 t.join()
1150 self.assertFalse(errors,
1151 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001153 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 s = f.read()
1155 for i in range(256):
1156 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001157 finally:
1158 support.unlink(support.TESTFN)
1159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 def test_misbehaved_io(self):
1161 rawio = self.MisbehavedRawIO()
1162 bufio = self.tp(rawio, 5)
1163 self.assertRaises(IOError, bufio.seek, 0)
1164 self.assertRaises(IOError, bufio.tell)
1165 self.assertRaises(IOError, bufio.write, b"abcdef")
1166
Benjamin Peterson59406a92009-03-26 17:10:29 +00001167 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001168 with support.check_warnings(("max_buffer_size is deprecated",
1169 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001170 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001171
1172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173class CBufferedWriterTest(BufferedWriterTest):
1174 tp = io.BufferedWriter
1175
1176 def test_constructor(self):
1177 BufferedWriterTest.test_constructor(self)
1178 # The allocation can succeed on 32-bit builds, e.g. with more
1179 # than 2GB RAM and a 64-bit kernel.
1180 if sys.maxsize > 0x7FFFFFFF:
1181 rawio = self.MockRawIO()
1182 bufio = self.tp(rawio)
1183 self.assertRaises((OverflowError, MemoryError, ValueError),
1184 bufio.__init__, rawio, sys.maxsize)
1185
1186 def test_initialization(self):
1187 rawio = self.MockRawIO()
1188 bufio = self.tp(rawio)
1189 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1190 self.assertRaises(ValueError, bufio.write, b"def")
1191 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1192 self.assertRaises(ValueError, bufio.write, b"def")
1193 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1194 self.assertRaises(ValueError, bufio.write, b"def")
1195
1196 def test_garbage_collection(self):
1197 # C BufferedWriter objects are collected, and collecting them flushes
1198 # all data to disk.
1199 # The Python version has __del__, so it ends into gc.garbage instead
1200 rawio = self.FileIO(support.TESTFN, "w+b")
1201 f = self.tp(rawio)
1202 f.write(b"123xxx")
1203 f.x = f
1204 wr = weakref.ref(f)
1205 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001206 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001207 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001208 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 self.assertEqual(f.read(), b"123xxx")
1210
1211
1212class PyBufferedWriterTest(BufferedWriterTest):
1213 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001214
Guido van Rossum01a27522007-03-07 01:00:12 +00001215class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001216
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001217 def test_constructor(self):
1218 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001219 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001220
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001221 def test_detach(self):
1222 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1223 self.assertRaises(self.UnsupportedOperation, pair.detach)
1224
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001225 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001226 with support.check_warnings(("max_buffer_size is deprecated",
1227 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001228 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001229
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001230 def test_constructor_with_not_readable(self):
1231 class NotReadable(MockRawIO):
1232 def readable(self):
1233 return False
1234
1235 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1236
1237 def test_constructor_with_not_writeable(self):
1238 class NotWriteable(MockRawIO):
1239 def writable(self):
1240 return False
1241
1242 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1243
1244 def test_read(self):
1245 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1246
1247 self.assertEqual(pair.read(3), b"abc")
1248 self.assertEqual(pair.read(1), b"d")
1249 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001250 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1251 self.assertEqual(pair.read(None), b"abc")
1252
1253 def test_readlines(self):
1254 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1255 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1256 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1257 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001258
1259 def test_read1(self):
1260 # .read1() is delegated to the underlying reader object, so this test
1261 # can be shallow.
1262 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1263
1264 self.assertEqual(pair.read1(3), b"abc")
1265
1266 def test_readinto(self):
1267 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1268
1269 data = bytearray(5)
1270 self.assertEqual(pair.readinto(data), 5)
1271 self.assertEqual(data, b"abcde")
1272
1273 def test_write(self):
1274 w = self.MockRawIO()
1275 pair = self.tp(self.MockRawIO(), w)
1276
1277 pair.write(b"abc")
1278 pair.flush()
1279 pair.write(b"def")
1280 pair.flush()
1281 self.assertEqual(w._write_stack, [b"abc", b"def"])
1282
1283 def test_peek(self):
1284 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1285
1286 self.assertTrue(pair.peek(3).startswith(b"abc"))
1287 self.assertEqual(pair.read(3), b"abc")
1288
1289 def test_readable(self):
1290 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1291 self.assertTrue(pair.readable())
1292
1293 def test_writeable(self):
1294 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1295 self.assertTrue(pair.writable())
1296
1297 def test_seekable(self):
1298 # BufferedRWPairs are never seekable, even if their readers and writers
1299 # are.
1300 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1301 self.assertFalse(pair.seekable())
1302
1303 # .flush() is delegated to the underlying writer object and has been
1304 # tested in the test_write method.
1305
1306 def test_close_and_closed(self):
1307 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1308 self.assertFalse(pair.closed)
1309 pair.close()
1310 self.assertTrue(pair.closed)
1311
1312 def test_isatty(self):
1313 class SelectableIsAtty(MockRawIO):
1314 def __init__(self, isatty):
1315 MockRawIO.__init__(self)
1316 self._isatty = isatty
1317
1318 def isatty(self):
1319 return self._isatty
1320
1321 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1322 self.assertFalse(pair.isatty())
1323
1324 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1325 self.assertTrue(pair.isatty())
1326
1327 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1328 self.assertTrue(pair.isatty())
1329
1330 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1331 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333class CBufferedRWPairTest(BufferedRWPairTest):
1334 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336class PyBufferedRWPairTest(BufferedRWPairTest):
1337 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339
1340class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1341 read_mode = "rb+"
1342 write_mode = "wb+"
1343
1344 def test_constructor(self):
1345 BufferedReaderTest.test_constructor(self)
1346 BufferedWriterTest.test_constructor(self)
1347
1348 def test_read_and_write(self):
1349 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001350 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001351
1352 self.assertEqual(b"as", rw.read(2))
1353 rw.write(b"ddd")
1354 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001355 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001357 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 def test_seek_and_tell(self):
1360 raw = self.BytesIO(b"asdfghjkl")
1361 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001362
1363 self.assertEquals(b"as", rw.read(2))
1364 self.assertEquals(2, rw.tell())
1365 rw.seek(0, 0)
1366 self.assertEquals(b"asdf", rw.read(4))
1367
1368 rw.write(b"asdf")
1369 rw.seek(0, 0)
1370 self.assertEquals(b"asdfasdfl", rw.read())
1371 self.assertEquals(9, rw.tell())
1372 rw.seek(-4, 2)
1373 self.assertEquals(5, rw.tell())
1374 rw.seek(2, 1)
1375 self.assertEquals(7, rw.tell())
1376 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001377 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001378
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 def check_flush_and_read(self, read_func):
1380 raw = self.BytesIO(b"abcdefghi")
1381 bufio = self.tp(raw)
1382
1383 self.assertEquals(b"ab", read_func(bufio, 2))
1384 bufio.write(b"12")
1385 self.assertEquals(b"ef", read_func(bufio, 2))
1386 self.assertEquals(6, bufio.tell())
1387 bufio.flush()
1388 self.assertEquals(6, bufio.tell())
1389 self.assertEquals(b"ghi", read_func(bufio))
1390 raw.seek(0, 0)
1391 raw.write(b"XYZ")
1392 # flush() resets the read buffer
1393 bufio.flush()
1394 bufio.seek(0, 0)
1395 self.assertEquals(b"XYZ", read_func(bufio, 3))
1396
1397 def test_flush_and_read(self):
1398 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1399
1400 def test_flush_and_readinto(self):
1401 def _readinto(bufio, n=-1):
1402 b = bytearray(n if n >= 0 else 9999)
1403 n = bufio.readinto(b)
1404 return bytes(b[:n])
1405 self.check_flush_and_read(_readinto)
1406
1407 def test_flush_and_peek(self):
1408 def _peek(bufio, n=-1):
1409 # This relies on the fact that the buffer can contain the whole
1410 # raw stream, otherwise peek() can return less.
1411 b = bufio.peek(n)
1412 if n != -1:
1413 b = b[:n]
1414 bufio.seek(len(b), 1)
1415 return b
1416 self.check_flush_and_read(_peek)
1417
1418 def test_flush_and_write(self):
1419 raw = self.BytesIO(b"abcdefghi")
1420 bufio = self.tp(raw)
1421
1422 bufio.write(b"123")
1423 bufio.flush()
1424 bufio.write(b"45")
1425 bufio.flush()
1426 bufio.seek(0, 0)
1427 self.assertEquals(b"12345fghi", raw.getvalue())
1428 self.assertEquals(b"12345fghi", bufio.read())
1429
1430 def test_threads(self):
1431 BufferedReaderTest.test_threads(self)
1432 BufferedWriterTest.test_threads(self)
1433
1434 def test_writes_and_peek(self):
1435 def _peek(bufio):
1436 bufio.peek(1)
1437 self.check_writes(_peek)
1438 def _peek(bufio):
1439 pos = bufio.tell()
1440 bufio.seek(-1, 1)
1441 bufio.peek(1)
1442 bufio.seek(pos, 0)
1443 self.check_writes(_peek)
1444
1445 def test_writes_and_reads(self):
1446 def _read(bufio):
1447 bufio.seek(-1, 1)
1448 bufio.read(1)
1449 self.check_writes(_read)
1450
1451 def test_writes_and_read1s(self):
1452 def _read1(bufio):
1453 bufio.seek(-1, 1)
1454 bufio.read1(1)
1455 self.check_writes(_read1)
1456
1457 def test_writes_and_readintos(self):
1458 def _read(bufio):
1459 bufio.seek(-1, 1)
1460 bufio.readinto(bytearray(1))
1461 self.check_writes(_read)
1462
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001463 def test_write_after_readahead(self):
1464 # Issue #6629: writing after the buffer was filled by readahead should
1465 # first rewind the raw stream.
1466 for overwrite_size in [1, 5]:
1467 raw = self.BytesIO(b"A" * 10)
1468 bufio = self.tp(raw, 4)
1469 # Trigger readahead
1470 self.assertEqual(bufio.read(1), b"A")
1471 self.assertEqual(bufio.tell(), 1)
1472 # Overwriting should rewind the raw stream if it needs so
1473 bufio.write(b"B" * overwrite_size)
1474 self.assertEqual(bufio.tell(), overwrite_size + 1)
1475 # If the write size was smaller than the buffer size, flush() and
1476 # check that rewind happens.
1477 bufio.flush()
1478 self.assertEqual(bufio.tell(), overwrite_size + 1)
1479 s = raw.getvalue()
1480 self.assertEqual(s,
1481 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1482
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001483 def test_truncate_after_read_or_write(self):
1484 raw = self.BytesIO(b"A" * 10)
1485 bufio = self.tp(raw, 100)
1486 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1487 self.assertEqual(bufio.truncate(), 2)
1488 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1489 self.assertEqual(bufio.truncate(), 4)
1490
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001491 def test_misbehaved_io(self):
1492 BufferedReaderTest.test_misbehaved_io(self)
1493 BufferedWriterTest.test_misbehaved_io(self)
1494
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001495 # You can't construct a BufferedRandom over a non-seekable stream.
1496 test_unseekable = None
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498class CBufferedRandomTest(BufferedRandomTest):
1499 tp = io.BufferedRandom
1500
1501 def test_constructor(self):
1502 BufferedRandomTest.test_constructor(self)
1503 # The allocation can succeed on 32-bit builds, e.g. with more
1504 # than 2GB RAM and a 64-bit kernel.
1505 if sys.maxsize > 0x7FFFFFFF:
1506 rawio = self.MockRawIO()
1507 bufio = self.tp(rawio)
1508 self.assertRaises((OverflowError, MemoryError, ValueError),
1509 bufio.__init__, rawio, sys.maxsize)
1510
1511 def test_garbage_collection(self):
1512 CBufferedReaderTest.test_garbage_collection(self)
1513 CBufferedWriterTest.test_garbage_collection(self)
1514
1515class PyBufferedRandomTest(BufferedRandomTest):
1516 tp = pyio.BufferedRandom
1517
1518
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001519# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1520# properties:
1521# - A single output character can correspond to many bytes of input.
1522# - The number of input bytes to complete the character can be
1523# undetermined until the last input byte is received.
1524# - The number of input bytes can vary depending on previous input.
1525# - A single input byte can correspond to many characters of output.
1526# - The number of output characters can be undetermined until the
1527# last input byte is received.
1528# - The number of output characters can vary depending on previous input.
1529
1530class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1531 """
1532 For testing seek/tell behavior with a stateful, buffering decoder.
1533
1534 Input is a sequence of words. Words may be fixed-length (length set
1535 by input) or variable-length (period-terminated). In variable-length
1536 mode, extra periods are ignored. Possible words are:
1537 - 'i' followed by a number sets the input length, I (maximum 99).
1538 When I is set to 0, words are space-terminated.
1539 - 'o' followed by a number sets the output length, O (maximum 99).
1540 - Any other word is converted into a word followed by a period on
1541 the output. The output word consists of the input word truncated
1542 or padded out with hyphens to make its length equal to O. If O
1543 is 0, the word is output verbatim without truncating or padding.
1544 I and O are initially set to 1. When I changes, any buffered input is
1545 re-scanned according to the new I. EOF also terminates the last word.
1546 """
1547
1548 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001549 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001550 self.reset()
1551
1552 def __repr__(self):
1553 return '<SID %x>' % id(self)
1554
1555 def reset(self):
1556 self.i = 1
1557 self.o = 1
1558 self.buffer = bytearray()
1559
1560 def getstate(self):
1561 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1562 return bytes(self.buffer), i*100 + o
1563
1564 def setstate(self, state):
1565 buffer, io = state
1566 self.buffer = bytearray(buffer)
1567 i, o = divmod(io, 100)
1568 self.i, self.o = i ^ 1, o ^ 1
1569
1570 def decode(self, input, final=False):
1571 output = ''
1572 for b in input:
1573 if self.i == 0: # variable-length, terminated with period
1574 if b == ord('.'):
1575 if self.buffer:
1576 output += self.process_word()
1577 else:
1578 self.buffer.append(b)
1579 else: # fixed-length, terminate after self.i bytes
1580 self.buffer.append(b)
1581 if len(self.buffer) == self.i:
1582 output += self.process_word()
1583 if final and self.buffer: # EOF terminates the last word
1584 output += self.process_word()
1585 return output
1586
1587 def process_word(self):
1588 output = ''
1589 if self.buffer[0] == ord('i'):
1590 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1591 elif self.buffer[0] == ord('o'):
1592 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1593 else:
1594 output = self.buffer.decode('ascii')
1595 if len(output) < self.o:
1596 output += '-'*self.o # pad out with hyphens
1597 if self.o:
1598 output = output[:self.o] # truncate to output length
1599 output += '.'
1600 self.buffer = bytearray()
1601 return output
1602
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001603 codecEnabled = False
1604
1605 @classmethod
1606 def lookupTestDecoder(cls, name):
1607 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001608 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001609 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001610 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001611 incrementalencoder=None,
1612 streamreader=None, streamwriter=None,
1613 incrementaldecoder=cls)
1614
1615# Register the previous decoder for testing.
1616# Disabled by default, tests will enable it.
1617codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1618
1619
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001620class StatefulIncrementalDecoderTest(unittest.TestCase):
1621 """
1622 Make sure the StatefulIncrementalDecoder actually works.
1623 """
1624
1625 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001626 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001627 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001628 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001629 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001630 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001631 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001632 # I=0, O=6 (variable-length input, fixed-length output)
1633 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1634 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001635 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001636 # I=6, O=3 (fixed-length input > fixed-length output)
1637 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1638 # I=0, then 3; O=29, then 15 (with longer output)
1639 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1640 'a----------------------------.' +
1641 'b----------------------------.' +
1642 'cde--------------------------.' +
1643 'abcdefghijabcde.' +
1644 'a.b------------.' +
1645 '.c.------------.' +
1646 'd.e------------.' +
1647 'k--------------.' +
1648 'l--------------.' +
1649 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001650 ]
1651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001653 # Try a few one-shot test cases.
1654 for input, eof, output in self.test_cases:
1655 d = StatefulIncrementalDecoder()
1656 self.assertEquals(d.decode(input, eof), output)
1657
1658 # Also test an unfinished decode, followed by forcing EOF.
1659 d = StatefulIncrementalDecoder()
1660 self.assertEquals(d.decode(b'oiabcd'), '')
1661 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001662
1663class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001664
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001665 def setUp(self):
1666 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1667 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001668 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001669
Guido van Rossumd0712812007-04-11 16:32:43 +00001670 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001671 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 def test_constructor(self):
1674 r = self.BytesIO(b"\xc3\xa9\n\n")
1675 b = self.BufferedReader(r, 1000)
1676 t = self.TextIOWrapper(b)
1677 t.__init__(b, encoding="latin1", newline="\r\n")
1678 self.assertEquals(t.encoding, "latin1")
1679 self.assertEquals(t.line_buffering, False)
1680 t.__init__(b, encoding="utf8", line_buffering=True)
1681 self.assertEquals(t.encoding, "utf8")
1682 self.assertEquals(t.line_buffering, True)
1683 self.assertEquals("\xe9\n", t.readline())
1684 self.assertRaises(TypeError, t.__init__, b, newline=42)
1685 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1686
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001687 def test_detach(self):
1688 r = self.BytesIO()
1689 b = self.BufferedWriter(r)
1690 t = self.TextIOWrapper(b)
1691 self.assertIs(t.detach(), b)
1692
1693 t = self.TextIOWrapper(b, encoding="ascii")
1694 t.write("howdy")
1695 self.assertFalse(r.getvalue())
1696 t.detach()
1697 self.assertEqual(r.getvalue(), b"howdy")
1698 self.assertRaises(ValueError, t.detach)
1699
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001700 def test_repr(self):
1701 raw = self.BytesIO("hello".encode("utf-8"))
1702 b = self.BufferedReader(raw)
1703 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001704 modname = self.TextIOWrapper.__module__
1705 self.assertEqual(repr(t),
1706 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1707 raw.name = "dummy"
1708 self.assertEqual(repr(t),
1709 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1710 raw.name = b"dummy"
1711 self.assertEqual(repr(t),
1712 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_line_buffering(self):
1715 r = self.BytesIO()
1716 b = self.BufferedWriter(r, 1000)
1717 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001718 t.write("X")
1719 self.assertEquals(r.getvalue(), b"") # No flush happened
1720 t.write("Y\nZ")
1721 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1722 t.write("A\rB")
1723 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_encoding(self):
1726 # Check the encoding attribute is always set, and valid
1727 b = self.BytesIO()
1728 t = self.TextIOWrapper(b, encoding="utf8")
1729 self.assertEqual(t.encoding, "utf8")
1730 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001731 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 codecs.lookup(t.encoding)
1733
1734 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001735 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 b = self.BytesIO(b"abc\n\xff\n")
1737 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001738 self.assertRaises(UnicodeError, t.read)
1739 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 b = self.BytesIO(b"abc\n\xff\n")
1741 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001742 self.assertRaises(UnicodeError, t.read)
1743 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 b = self.BytesIO(b"abc\n\xff\n")
1745 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001746 self.assertEquals(t.read(), "abc\n\n")
1747 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 b = self.BytesIO(b"abc\n\xff\n")
1749 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001750 self.assertEquals(t.read(), "abc\n\ufffd\n")
1751
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001752 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001753 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 b = self.BytesIO()
1755 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001756 self.assertRaises(UnicodeError, t.write, "\xff")
1757 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 b = self.BytesIO()
1759 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001760 self.assertRaises(UnicodeError, t.write, "\xff")
1761 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 b = self.BytesIO()
1763 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001764 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001765 t.write("abc\xffdef\n")
1766 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001767 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001768 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 b = self.BytesIO()
1770 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001771 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001772 t.write("abc\xffdef\n")
1773 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001774 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001775
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001777 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1778
1779 tests = [
1780 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001781 [ '', input_lines ],
1782 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1783 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1784 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001785 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001786 encodings = (
1787 'utf-8', 'latin-1',
1788 'utf-16', 'utf-16-le', 'utf-16-be',
1789 'utf-32', 'utf-32-le', 'utf-32-be',
1790 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001791
Guido van Rossum8358db22007-08-18 21:39:55 +00001792 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001793 # character in TextIOWrapper._pending_line.
1794 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001795 # XXX: str.encode() should return bytes
1796 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001797 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001798 for bufsize in range(1, 10):
1799 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1801 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001802 encoding=encoding)
1803 if do_reads:
1804 got_lines = []
1805 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001806 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001807 if c2 == '':
1808 break
1809 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001810 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001811 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001812 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001813
1814 for got_line, exp_line in zip(got_lines, exp_lines):
1815 self.assertEquals(got_line, exp_line)
1816 self.assertEquals(len(got_lines), len(exp_lines))
1817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001818 def test_newlines_input(self):
1819 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001820 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1821 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001822 (None, normalized.decode("ascii").splitlines(True)),
1823 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1825 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1826 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001827 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 buf = self.BytesIO(testdata)
1829 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001830 self.assertEquals(txt.readlines(), expected)
1831 txt.seek(0)
1832 self.assertEquals(txt.read(), "".join(expected))
1833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001834 def test_newlines_output(self):
1835 testdict = {
1836 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1837 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1838 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1839 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1840 }
1841 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1842 for newline, expected in tests:
1843 buf = self.BytesIO()
1844 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1845 txt.write("AAA\nB")
1846 txt.write("BB\nCCC\n")
1847 txt.write("X\rY\r\nZ")
1848 txt.flush()
1849 self.assertEquals(buf.closed, False)
1850 self.assertEquals(buf.getvalue(), expected)
1851
1852 def test_destructor(self):
1853 l = []
1854 base = self.BytesIO
1855 class MyBytesIO(base):
1856 def close(self):
1857 l.append(self.getvalue())
1858 base.close(self)
1859 b = MyBytesIO()
1860 t = self.TextIOWrapper(b, encoding="ascii")
1861 t.write("abc")
1862 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001863 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 self.assertEquals([b"abc"], l)
1865
1866 def test_override_destructor(self):
1867 record = []
1868 class MyTextIO(self.TextIOWrapper):
1869 def __del__(self):
1870 record.append(1)
1871 try:
1872 f = super().__del__
1873 except AttributeError:
1874 pass
1875 else:
1876 f()
1877 def close(self):
1878 record.append(2)
1879 super().close()
1880 def flush(self):
1881 record.append(3)
1882 super().flush()
1883 b = self.BytesIO()
1884 t = MyTextIO(b, encoding="ascii")
1885 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001886 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001887 self.assertEqual(record, [1, 2, 3])
1888
1889 def test_error_through_destructor(self):
1890 # Test that the exception state is not modified by a destructor,
1891 # even if close() fails.
1892 rawio = self.CloseFailureIO()
1893 def f():
1894 self.TextIOWrapper(rawio).xyzzy
1895 with support.captured_output("stderr") as s:
1896 self.assertRaises(AttributeError, f)
1897 s = s.getvalue().strip()
1898 if s:
1899 # The destructor *may* have printed an unraisable error, check it
1900 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001901 self.assertTrue(s.startswith("Exception IOError: "), s)
1902 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001903
Guido van Rossum9b76da62007-04-11 01:09:03 +00001904 # Systematic tests of the text I/O API
1905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001907 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1908 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910 f._CHUNK_SIZE = chunksize
1911 self.assertEquals(f.write("abc"), 3)
1912 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 f._CHUNK_SIZE = chunksize
1915 self.assertEquals(f.tell(), 0)
1916 self.assertEquals(f.read(), "abc")
1917 cookie = f.tell()
1918 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001919 self.assertEquals(f.read(None), "abc")
1920 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001921 self.assertEquals(f.read(2), "ab")
1922 self.assertEquals(f.read(1), "c")
1923 self.assertEquals(f.read(1), "")
1924 self.assertEquals(f.read(), "")
1925 self.assertEquals(f.tell(), cookie)
1926 self.assertEquals(f.seek(0), 0)
1927 self.assertEquals(f.seek(0, 2), cookie)
1928 self.assertEquals(f.write("def"), 3)
1929 self.assertEquals(f.seek(cookie), cookie)
1930 self.assertEquals(f.read(), "def")
1931 if enc.startswith("utf"):
1932 self.multi_line_test(f, enc)
1933 f.close()
1934
1935 def multi_line_test(self, f, enc):
1936 f.seek(0)
1937 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001938 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001939 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001940 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001941 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001942 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001943 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001944 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001945 wlines.append((f.tell(), line))
1946 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001947 f.seek(0)
1948 rlines = []
1949 while True:
1950 pos = f.tell()
1951 line = f.readline()
1952 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001953 break
1954 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001955 self.assertEquals(rlines, wlines)
1956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 def test_telling(self):
1958 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001959 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001960 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001961 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001962 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001963 p2 = f.tell()
1964 f.seek(0)
1965 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001966 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001967 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001968 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001969 self.assertEquals(f.tell(), p2)
1970 f.seek(0)
1971 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001972 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001973 self.assertRaises(IOError, f.tell)
1974 self.assertEquals(f.tell(), p2)
1975 f.close()
1976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 def test_seeking(self):
1978 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001979 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001980 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001981 prefix = bytes(u_prefix.encode("utf-8"))
1982 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001983 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001984 suffix = bytes(u_suffix.encode("utf-8"))
1985 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001987 f.write(line*2)
1988 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001990 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001991 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001992 self.assertEquals(f.tell(), prefix_size)
1993 self.assertEquals(f.readline(), u_suffix)
1994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001996 # Regression test for a specific bug
1997 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001999 f.write(data)
2000 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002002 f._CHUNK_SIZE # Just test that it exists
2003 f._CHUNK_SIZE = 2
2004 f.readline()
2005 f.tell()
2006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 def test_seek_and_tell(self):
2008 #Test seek/tell using the StatefulIncrementalDecoder.
2009 # Make test faster by doing smaller seeks
2010 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002011
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002012 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002013 """Tell/seek to various points within a data stream and ensure
2014 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002016 f.write(data)
2017 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 f = self.open(support.TESTFN, encoding='test_decoder')
2019 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002020 decoded = f.read()
2021 f.close()
2022
Neal Norwitze2b07052008-03-18 19:52:05 +00002023 for i in range(min_pos, len(decoded) + 1): # seek positions
2024 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002026 self.assertEquals(f.read(i), decoded[:i])
2027 cookie = f.tell()
2028 self.assertEquals(f.read(j), decoded[i:i + j])
2029 f.seek(cookie)
2030 self.assertEquals(f.read(), decoded[i:])
2031 f.close()
2032
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002033 # Enable the test decoder.
2034 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002035
2036 # Run the tests.
2037 try:
2038 # Try each test case.
2039 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002040 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002041
2042 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002043 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2044 offset = CHUNK_SIZE - len(input)//2
2045 prefix = b'.'*offset
2046 # Don't bother seeking into the prefix (takes too long).
2047 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002048 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002049
2050 # Ensure our test decoder won't interfere with subsequent tests.
2051 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002052 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002055 data = "1234567890"
2056 tests = ("utf-16",
2057 "utf-16-le",
2058 "utf-16-be",
2059 "utf-32",
2060 "utf-32-le",
2061 "utf-32-be")
2062 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 buf = self.BytesIO()
2064 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002065 # Check if the BOM is written only once (see issue1753).
2066 f.write(data)
2067 f.write(data)
2068 f.seek(0)
2069 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002070 f.seek(0)
2071 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002072 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2073
Benjamin Petersona1b49012009-03-31 23:11:32 +00002074 def test_unreadable(self):
2075 class UnReadable(self.BytesIO):
2076 def readable(self):
2077 return False
2078 txt = self.TextIOWrapper(UnReadable())
2079 self.assertRaises(IOError, txt.read)
2080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 def test_read_one_by_one(self):
2082 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002083 reads = ""
2084 while True:
2085 c = txt.read(1)
2086 if not c:
2087 break
2088 reads += c
2089 self.assertEquals(reads, "AA\nBB")
2090
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002091 def test_readlines(self):
2092 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2093 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2094 txt.seek(0)
2095 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2096 txt.seek(0)
2097 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2098
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002099 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002101 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002103 reads = ""
2104 while True:
2105 c = txt.read(128)
2106 if not c:
2107 break
2108 reads += c
2109 self.assertEquals(reads, "A"*127+"\nB")
2110
2111 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002113
2114 # read one char at a time
2115 reads = ""
2116 while True:
2117 c = txt.read(1)
2118 if not c:
2119 break
2120 reads += c
2121 self.assertEquals(reads, self.normalized)
2122
2123 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002124 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002125 txt._CHUNK_SIZE = 4
2126
2127 reads = ""
2128 while True:
2129 c = txt.read(4)
2130 if not c:
2131 break
2132 reads += c
2133 self.assertEquals(reads, self.normalized)
2134
2135 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002137 txt._CHUNK_SIZE = 4
2138
2139 reads = txt.read(4)
2140 reads += txt.read(4)
2141 reads += txt.readline()
2142 reads += txt.readline()
2143 reads += txt.readline()
2144 self.assertEquals(reads, self.normalized)
2145
2146 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002148 txt._CHUNK_SIZE = 4
2149
2150 reads = txt.read(4)
2151 reads += txt.read()
2152 self.assertEquals(reads, self.normalized)
2153
2154 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002156 txt._CHUNK_SIZE = 4
2157
2158 reads = txt.read(4)
2159 pos = txt.tell()
2160 txt.seek(0)
2161 txt.seek(pos)
2162 self.assertEquals(txt.read(4), "BBB\n")
2163
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002164 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 buffer = self.BytesIO(self.testdata)
2166 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002167
2168 self.assertEqual(buffer.seekable(), txt.seekable())
2169
Antoine Pitroue4501852009-05-14 18:55:55 +00002170 def test_append_bom(self):
2171 # The BOM is not written again when appending to a non-empty file
2172 filename = support.TESTFN
2173 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2174 with self.open(filename, 'w', encoding=charset) as f:
2175 f.write('aaa')
2176 pos = f.tell()
2177 with self.open(filename, 'rb') as f:
2178 self.assertEquals(f.read(), 'aaa'.encode(charset))
2179
2180 with self.open(filename, 'a', encoding=charset) as f:
2181 f.write('xxx')
2182 with self.open(filename, 'rb') as f:
2183 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2184
2185 def test_seek_bom(self):
2186 # Same test, but when seeking manually
2187 filename = support.TESTFN
2188 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2189 with self.open(filename, 'w', encoding=charset) as f:
2190 f.write('aaa')
2191 pos = f.tell()
2192 with self.open(filename, 'r+', encoding=charset) as f:
2193 f.seek(pos)
2194 f.write('zzz')
2195 f.seek(0)
2196 f.write('bbb')
2197 with self.open(filename, 'rb') as f:
2198 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2199
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002200 def test_errors_property(self):
2201 with self.open(support.TESTFN, "w") as f:
2202 self.assertEqual(f.errors, "strict")
2203 with self.open(support.TESTFN, "w", errors="replace") as f:
2204 self.assertEqual(f.errors, "replace")
2205
Victor Stinner45df8202010-04-28 22:31:17 +00002206 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002207 def test_threads_write(self):
2208 # Issue6750: concurrent writes could duplicate data
2209 event = threading.Event()
2210 with self.open(support.TESTFN, "w", buffering=1) as f:
2211 def run(n):
2212 text = "Thread%03d\n" % n
2213 event.wait()
2214 f.write(text)
2215 threads = [threading.Thread(target=lambda n=x: run(n))
2216 for x in range(20)]
2217 for t in threads:
2218 t.start()
2219 time.sleep(0.02)
2220 event.set()
2221 for t in threads:
2222 t.join()
2223 with self.open(support.TESTFN) as f:
2224 content = f.read()
2225 for n in range(20):
2226 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2227
Antoine Pitrou6be88762010-05-03 16:48:20 +00002228 def test_flush_error_on_close(self):
2229 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2230 def bad_flush():
2231 raise IOError()
2232 txt.flush = bad_flush
2233 self.assertRaises(IOError, txt.close) # exception not swallowed
2234
2235 def test_multi_close(self):
2236 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2237 txt.close()
2238 txt.close()
2239 txt.close()
2240 self.assertRaises(ValueError, txt.flush)
2241
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002242 def test_unseekable(self):
2243 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2244 self.assertRaises(self.UnsupportedOperation, txt.tell)
2245 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002247class CTextIOWrapperTest(TextIOWrapperTest):
2248
2249 def test_initialization(self):
2250 r = self.BytesIO(b"\xc3\xa9\n\n")
2251 b = self.BufferedReader(r, 1000)
2252 t = self.TextIOWrapper(b)
2253 self.assertRaises(TypeError, t.__init__, b, newline=42)
2254 self.assertRaises(ValueError, t.read)
2255 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2256 self.assertRaises(ValueError, t.read)
2257
2258 def test_garbage_collection(self):
2259 # C TextIOWrapper objects are collected, and collecting them flushes
2260 # all data to disk.
2261 # The Python version has __del__, so it ends in gc.garbage instead.
2262 rawio = io.FileIO(support.TESTFN, "wb")
2263 b = self.BufferedWriter(rawio)
2264 t = self.TextIOWrapper(b, encoding="ascii")
2265 t.write("456def")
2266 t.x = t
2267 wr = weakref.ref(t)
2268 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002269 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002270 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002271 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 self.assertEqual(f.read(), b"456def")
2273
2274class PyTextIOWrapperTest(TextIOWrapperTest):
2275 pass
2276
2277
2278class IncrementalNewlineDecoderTest(unittest.TestCase):
2279
2280 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002281 # UTF-8 specific tests for a newline decoder
2282 def _check_decode(b, s, **kwargs):
2283 # We exercise getstate() / setstate() as well as decode()
2284 state = decoder.getstate()
2285 self.assertEquals(decoder.decode(b, **kwargs), s)
2286 decoder.setstate(state)
2287 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002288
Antoine Pitrou180a3362008-12-14 16:36:46 +00002289 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002290
Antoine Pitrou180a3362008-12-14 16:36:46 +00002291 _check_decode(b'\xe8', "")
2292 _check_decode(b'\xa2', "")
2293 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002294
Antoine Pitrou180a3362008-12-14 16:36:46 +00002295 _check_decode(b'\xe8', "")
2296 _check_decode(b'\xa2', "")
2297 _check_decode(b'\x88', "\u8888")
2298
2299 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002300 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2301
Antoine Pitrou180a3362008-12-14 16:36:46 +00002302 decoder.reset()
2303 _check_decode(b'\n', "\n")
2304 _check_decode(b'\r', "")
2305 _check_decode(b'', "\n", final=True)
2306 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002307
Antoine Pitrou180a3362008-12-14 16:36:46 +00002308 _check_decode(b'\r', "")
2309 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002310
Antoine Pitrou180a3362008-12-14 16:36:46 +00002311 _check_decode(b'\r\r\n', "\n\n")
2312 _check_decode(b'\r', "")
2313 _check_decode(b'\r', "\n")
2314 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002315
Antoine Pitrou180a3362008-12-14 16:36:46 +00002316 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2317 _check_decode(b'\xe8\xa2\x88', "\u8888")
2318 _check_decode(b'\n', "\n")
2319 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2320 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002323 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 if encoding is not None:
2325 encoder = codecs.getincrementalencoder(encoding)()
2326 def _decode_bytewise(s):
2327 # Decode one byte at a time
2328 for b in encoder.encode(s):
2329 result.append(decoder.decode(bytes([b])))
2330 else:
2331 encoder = None
2332 def _decode_bytewise(s):
2333 # Decode one char at a time
2334 for c in s:
2335 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002336 self.assertEquals(decoder.newlines, None)
2337 _decode_bytewise("abc\n\r")
2338 self.assertEquals(decoder.newlines, '\n')
2339 _decode_bytewise("\nabc")
2340 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2341 _decode_bytewise("abc\r")
2342 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2343 _decode_bytewise("abc")
2344 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2345 _decode_bytewise("abc\r")
2346 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2347 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 input = "abc"
2349 if encoder is not None:
2350 encoder.reset()
2351 input = encoder.encode(input)
2352 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002353 self.assertEquals(decoder.newlines, None)
2354
2355 def test_newline_decoder(self):
2356 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002357 # None meaning the IncrementalNewlineDecoder takes unicode input
2358 # rather than bytes input
2359 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002360 'utf-16', 'utf-16-le', 'utf-16-be',
2361 'utf-32', 'utf-32-le', 'utf-32-be',
2362 )
2363 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364 decoder = enc and codecs.getincrementaldecoder(enc)()
2365 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2366 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002367 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2369 self.check_newline_decoding_utf8(decoder)
2370
Antoine Pitrou66913e22009-03-06 23:40:56 +00002371 def test_newline_bytes(self):
2372 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2373 def _check(dec):
2374 self.assertEquals(dec.newlines, None)
2375 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2376 self.assertEquals(dec.newlines, None)
2377 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2378 self.assertEquals(dec.newlines, None)
2379 dec = self.IncrementalNewlineDecoder(None, translate=False)
2380 _check(dec)
2381 dec = self.IncrementalNewlineDecoder(None, translate=True)
2382 _check(dec)
2383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2385 pass
2386
2387class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2388 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002389
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002390
Guido van Rossum01a27522007-03-07 01:00:12 +00002391# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002392
Guido van Rossum5abbf752007-08-27 17:39:33 +00002393class MiscIOTest(unittest.TestCase):
2394
Barry Warsaw40e82462008-11-20 20:14:50 +00002395 def tearDown(self):
2396 support.unlink(support.TESTFN)
2397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398 def test___all__(self):
2399 for name in self.io.__all__:
2400 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002401 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002402 if name == "open":
2403 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002404 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002405 self.assertTrue(issubclass(obj, Exception), name)
2406 elif not name.startswith("SEEK_"):
2407 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002408
Barry Warsaw40e82462008-11-20 20:14:50 +00002409 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002410 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002411 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002412 f.close()
2413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002414 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002415 self.assertEquals(f.name, support.TESTFN)
2416 self.assertEquals(f.buffer.name, support.TESTFN)
2417 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2418 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002419 self.assertEquals(f.buffer.mode, "rb")
2420 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002421 f.close()
2422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002424 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002425 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2426 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002429 self.assertEquals(g.mode, "wb")
2430 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002431 self.assertEquals(g.name, f.fileno())
2432 self.assertEquals(g.raw.name, f.fileno())
2433 f.close()
2434 g.close()
2435
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002436 def test_io_after_close(self):
2437 for kwargs in [
2438 {"mode": "w"},
2439 {"mode": "wb"},
2440 {"mode": "w", "buffering": 1},
2441 {"mode": "w", "buffering": 2},
2442 {"mode": "wb", "buffering": 0},
2443 {"mode": "r"},
2444 {"mode": "rb"},
2445 {"mode": "r", "buffering": 1},
2446 {"mode": "r", "buffering": 2},
2447 {"mode": "rb", "buffering": 0},
2448 {"mode": "w+"},
2449 {"mode": "w+b"},
2450 {"mode": "w+", "buffering": 1},
2451 {"mode": "w+", "buffering": 2},
2452 {"mode": "w+b", "buffering": 0},
2453 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002455 f.close()
2456 self.assertRaises(ValueError, f.flush)
2457 self.assertRaises(ValueError, f.fileno)
2458 self.assertRaises(ValueError, f.isatty)
2459 self.assertRaises(ValueError, f.__iter__)
2460 if hasattr(f, "peek"):
2461 self.assertRaises(ValueError, f.peek, 1)
2462 self.assertRaises(ValueError, f.read)
2463 if hasattr(f, "read1"):
2464 self.assertRaises(ValueError, f.read1, 1024)
2465 if hasattr(f, "readinto"):
2466 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2467 self.assertRaises(ValueError, f.readline)
2468 self.assertRaises(ValueError, f.readlines)
2469 self.assertRaises(ValueError, f.seek, 0)
2470 self.assertRaises(ValueError, f.tell)
2471 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472 self.assertRaises(ValueError, f.write,
2473 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002474 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 def test_blockingioerror(self):
2478 # Various BlockingIOError issues
2479 self.assertRaises(TypeError, self.BlockingIOError)
2480 self.assertRaises(TypeError, self.BlockingIOError, 1)
2481 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2482 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2483 b = self.BlockingIOError(1, "")
2484 self.assertEqual(b.characters_written, 0)
2485 class C(str):
2486 pass
2487 c = C("")
2488 b = self.BlockingIOError(1, c)
2489 c.b = b
2490 b.c = c
2491 wr = weakref.ref(c)
2492 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002493 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002494 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002495
2496 def test_abcs(self):
2497 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002498 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2499 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2500 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2501 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002502
2503 def _check_abc_inheritance(self, abcmodule):
2504 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002505 self.assertIsInstance(f, abcmodule.IOBase)
2506 self.assertIsInstance(f, abcmodule.RawIOBase)
2507 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2508 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002510 self.assertIsInstance(f, abcmodule.IOBase)
2511 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2512 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2513 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002514 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002515 self.assertIsInstance(f, abcmodule.IOBase)
2516 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2517 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2518 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519
2520 def test_abc_inheritance(self):
2521 # Test implementations inherit from their respective ABCs
2522 self._check_abc_inheritance(self)
2523
2524 def test_abc_inheritance_official(self):
2525 # Test implementations inherit from the official ABCs of the
2526 # baseline "io" module.
2527 self._check_abc_inheritance(io)
2528
Antoine Pitroue033e062010-10-29 10:38:18 +00002529 def _check_warn_on_dealloc(self, *args, **kwargs):
2530 f = open(*args, **kwargs)
2531 r = repr(f)
2532 with self.assertWarns(ResourceWarning) as cm:
2533 f = None
2534 support.gc_collect()
2535 self.assertIn(r, str(cm.warning.args[0]))
2536
2537 def test_warn_on_dealloc(self):
2538 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2539 self._check_warn_on_dealloc(support.TESTFN, "wb")
2540 self._check_warn_on_dealloc(support.TESTFN, "w")
2541
2542 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2543 fds = []
2544 try:
2545 r, w = os.pipe()
2546 fds += r, w
2547 self._check_warn_on_dealloc(r, *args, **kwargs)
2548 # When using closefd=False, there's no warning
2549 r, w = os.pipe()
2550 fds += r, w
2551 with warnings.catch_warnings(record=True) as recorded:
2552 open(r, *args, closefd=False, **kwargs)
2553 support.gc_collect()
2554 self.assertEqual(recorded, [])
2555 finally:
2556 for fd in fds:
2557 try:
2558 os.close(fd)
2559 except EnvironmentError as e:
2560 if e.errno != errno.EBADF:
2561 raise
2562
2563 def test_warn_on_dealloc_fd(self):
2564 self._check_warn_on_dealloc_fd("rb", buffering=0)
2565 self._check_warn_on_dealloc_fd("rb")
2566 self._check_warn_on_dealloc_fd("r")
2567
2568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569class CMiscIOTest(MiscIOTest):
2570 io = io
2571
2572class PyMiscIOTest(MiscIOTest):
2573 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002574
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002575
2576@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2577class SignalsTest(unittest.TestCase):
2578
2579 def setUp(self):
2580 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2581
2582 def tearDown(self):
2583 signal.signal(signal.SIGALRM, self.oldalrm)
2584
2585 def alarm_interrupt(self, sig, frame):
2586 1/0
2587
2588 @unittest.skipUnless(threading, 'Threading required for this test.')
2589 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2590 """Check that a partial write, when it gets interrupted, properly
2591 invokes the signal handler."""
2592 read_results = []
2593 def _read():
2594 s = os.read(r, 1)
2595 read_results.append(s)
2596 t = threading.Thread(target=_read)
2597 t.daemon = True
2598 r, w = os.pipe()
2599 try:
2600 wio = self.io.open(w, **fdopen_kwargs)
2601 t.start()
2602 signal.alarm(1)
2603 # Fill the pipe enough that the write will be blocking.
2604 # It will be interrupted by the timer armed above. Since the
2605 # other thread has read one byte, the low-level write will
2606 # return with a successful (partial) result rather than an EINTR.
2607 # The buffered IO layer must check for pending signal
2608 # handlers, which in this case will invoke alarm_interrupt().
2609 self.assertRaises(ZeroDivisionError,
2610 wio.write, item * (1024 * 1024))
2611 t.join()
2612 # We got one byte, get another one and check that it isn't a
2613 # repeat of the first one.
2614 read_results.append(os.read(r, 1))
2615 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2616 finally:
2617 os.close(w)
2618 os.close(r)
2619 # This is deliberate. If we didn't close the file descriptor
2620 # before closing wio, wio would try to flush its internal
2621 # buffer, and block again.
2622 try:
2623 wio.close()
2624 except IOError as e:
2625 if e.errno != errno.EBADF:
2626 raise
2627
2628 def test_interrupted_write_unbuffered(self):
2629 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2630
2631 def test_interrupted_write_buffered(self):
2632 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2633
2634 def test_interrupted_write_text(self):
2635 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2636
2637class CSignalsTest(SignalsTest):
2638 io = io
2639
2640class PySignalsTest(SignalsTest):
2641 io = pyio
2642
2643
Guido van Rossum28524c72007-02-27 05:47:44 +00002644def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 tests = (CIOTest, PyIOTest,
2646 CBufferedReaderTest, PyBufferedReaderTest,
2647 CBufferedWriterTest, PyBufferedWriterTest,
2648 CBufferedRWPairTest, PyBufferedRWPairTest,
2649 CBufferedRandomTest, PyBufferedRandomTest,
2650 StatefulIncrementalDecoderTest,
2651 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2652 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002653 CMiscIOTest, PyMiscIOTest,
2654 CSignalsTest, PySignalsTest,
2655 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656
2657 # Put the namespaces of the IO module we are testing and some useful mock
2658 # classes in the __dict__ of each test.
2659 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002660 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2662 c_io_ns = {name : getattr(io, name) for name in all_members}
2663 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2664 globs = globals()
2665 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2666 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2667 # Avoid turning open into a bound method.
2668 py_io_ns["open"] = pyio.OpenWrapper
2669 for test in tests:
2670 if test.__name__.startswith("C"):
2671 for name, obj in c_io_ns.items():
2672 setattr(test, name, obj)
2673 elif test.__name__.startswith("Py"):
2674 for name, obj in py_io_ns.items():
2675 setattr(test, name, obj)
2676
2677 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002678
2679if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 test_main()